hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject svn commit: r1066305 [1/3] - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Date Wed, 02 Feb 2011 02:05:19 GMT
Author: jitendra
Date: Wed Feb  2 02:05:18 2011
New Revision: 1066305

URL: http://svn.apache.org/viewvc?rev=1066305&view=rev
Log:
HDFS-1557. Separate Storage from FSImage. Contributed by Ivan Kelly.


Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
Removed:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Feb  2 02:05:18 2011
@@ -49,6 +49,8 @@ Trunk (unreleased changes)
 
     HDFS-1335. HDFS side change of HADDOP-6904: RPC compatibility. (hairong)
 
+    HDFS-1557. Separate Storage from FSImage. (Ivan Kelly via jitendra)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Wed Feb 
2 02:05:18 2011
@@ -65,7 +65,7 @@ public abstract class Storage extends St
   // Constants
   
   // last layout version that did not support upgrades
-  protected static final int LAST_PRE_UPGRADE_LAYOUT_VERSION = -3;
+  public static final int LAST_PRE_UPGRADE_LAYOUT_VERSION = -3;
   
   // this corresponds to Hadoop-0.14.
   public static final int LAST_UPGRADABLE_LAYOUT_VERSION = -7;
@@ -710,7 +710,7 @@ public abstract class Storage extends St
    * 
    * @param oldVersion
    */
-  protected static void checkVersionUpgradable(int oldVersion) 
+  public static void checkVersionUpgradable(int oldVersion) 
                                      throws IOException {
     if (oldVersion > LAST_UPGRADABLE_LAYOUT_VERSION) {
       String msg = "*********** Upgrade is not supported from this older" +
@@ -785,7 +785,13 @@ public abstract class Storage extends St
                             + from.getCanonicalPath() + " to " + to.getCanonicalPath());
   }
 
-  protected static void deleteDir(File dir) throws IOException {
+  /**
+   * Recursively delete all the content of the directory first and then 
+   * the directory itself from the local filesystem.
+   * @param dir The directory to delete
+   * @throws IOException
+   */
+  public static void deleteDir(File dir) throws IOException {
     if (!FileUtil.fullyDelete(dir))
       throw new IOException("Failed to delete " + dir.getCanonicalPath());
   }

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1066305&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Wed
Feb  2 02:05:18 2011
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
+import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Extension of FSImage for the backup node.
+ * This class handles the setup of the journaling 
+ * spool on the backup namenode.
+ */
+@InterfaceAudience.Private
+public class BackupImage extends FSImage {
+  // Names of the journal spool directory and the spool file
+  private static final String STORAGE_JSPOOL_DIR = "jspool";
+  private static final String STORAGE_JSPOOL_FILE =
+    NNStorage.NameNodeFile.EDITS_NEW.getName();
+
+  /** Backup input stream for loading edits into memory */
+  private EditLogBackupInputStream backupInputStream;
+
+  /** Is journal spooling in progress */
+  volatile JSpoolState jsState;
+
+  static enum JSpoolState {
+    OFF,
+    INPROGRESS,
+    WAIT;
+  }
+
+  /**
+   */
+  BackupImage() {
+    super();
+    storage.setDisablePreUpgradableLayoutCheck(true);
+    jsState = JSpoolState.OFF;
+  }
+
+  /**
+   * Analyze backup storage directories for consistency.<br>
+   * Recover from incomplete checkpoints if required.<br>
+   * Read VERSION and fstime files if exist.<br>
+   * Do not load image or edits.
+   *
+   * @param imageDirs list of image directories as URI.
+   * @param editsDirs list of edits directories URI.
+   * @throws IOException if the node should shutdown.
+   */
+  void recoverCreateRead(Collection<URI> imageDirs,
+                         Collection<URI> editsDirs) throws IOException {
+    storage.setStorageDirectories(imageDirs, editsDirs);
+    storage.setCheckpointTime(0L);
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      StorageState curState;
+      try {
+        curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR);
+        // sd is locked but not opened
+        switch(curState) {
+        case NON_EXISTENT:
+          // fail if any of the configured storage dirs are inaccessible
+          throw new InconsistentFSStateException(sd.getRoot(),
+                "checkpoint directory does not exist or is not accessible.");
+        case NOT_FORMATTED:
+          // for backup node all directories may be unformatted initially
+          LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
+          LOG.info("Formatting ...");
+          sd.clearDirectory(); // create empty current
+          break;
+        case NORMAL:
+          break;
+        default:  // recovery is possible
+          sd.doRecover(curState);
+        }
+        if(curState != StorageState.NOT_FORMATTED) {
+          sd.read(); // read and verify consistency with other directories
+        }
+      } catch(IOException ioe) {
+        sd.unlock();
+        throw ioe;
+      }
+    }
+  }
+
+  /**
+   * Reset storage directories.
+   * <p>
+   * Unlock the storage.
+   * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
+   * and recreate empty <code>current</code>.
+   * @throws IOException
+   */
+  synchronized void reset() throws IOException {
+    // reset NameSpace tree
+    FSDirectory fsDir = getFSNamesystem().dir;
+    fsDir.reset();
+
+    // unlock, close and rename storage directories
+    storage.unlockAll();
+    // recover from unsuccessful checkpoint if necessary
+    recoverCreateRead(storage.getImageDirectories(),
+                      storage.getEditsDirectories());
+    // rename and recreate
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      // rename current to lastcheckpoint.tmp
+      storage.moveCurrent(sd);
+    }
+  }
+
+  /**
+   * Load checkpoint from local files only if the memory state is empty.<br>
+   * Set new checkpoint time received from the name-node.<br>
+   * Move <code>lastcheckpoint.tmp</code> to <code>previous.checkpoint</code>.
+   * @throws IOException
+   */
+  void loadCheckpoint(CheckpointSignature sig) throws IOException {
+    // load current image and journal if it is not in memory already
+    if(!editLog.isOpen())
+      editLog.open();
+
+    FSDirectory fsDir = getFSNamesystem().dir;
+    if(fsDir.isEmpty()) {
+      Iterator<StorageDirectory> itImage
+        = storage.dirIterator(NameNodeDirType.IMAGE);
+      Iterator<StorageDirectory> itEdits
+        = storage.dirIterator(NameNodeDirType.EDITS);
+      if(!itImage.hasNext() || ! itEdits.hasNext())
+        throw new IOException("Could not locate checkpoint directories");
+      StorageDirectory sdName = itImage.next();
+      StorageDirectory sdEdits = itEdits.next();
+      getFSDirectoryRootLock().writeLock();
+      try { // load image under rootDir lock
+        loadFSImage(NNStorage.getStorageFile(sdName, NameNodeFile.IMAGE));
+      } finally {
+        getFSDirectoryRootLock().writeUnlock();
+      }
+      loadFSEdits(sdEdits);
+    }
+
+    // set storage fields
+    storage.setStorageInfo(sig);
+    storage.setImageDigest(sig.imageDigest);
+    storage.setCheckpointTime(sig.checkpointTime);
+  }
+
+  /**
+   * Save meta-data into fsimage files.
+   * and create empty edits.
+   */
+  void saveCheckpoint() throws IOException {
+    saveNamespace(false);
+  }
+
+  private FSDirectory getFSDirectoryRootLock() {
+    return getFSNamesystem().dir;
+  }
+
+  static File getJSpoolDir(StorageDirectory sd) {
+    return new File(sd.getRoot(), STORAGE_JSPOOL_DIR);
+  }
+
+  static File getJSpoolFile(StorageDirectory sd) {
+    return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE);
+  }
+
+  /**
+   * Journal writer journals new meta-data state.
+   * <ol>
+   * <li> If Journal Spool state is OFF then journal records (edits)
+   * are applied directly to meta-data state in memory and are written
+   * to the edits file(s).</li>
+   * <li> If Journal Spool state is INPROGRESS then records are only
+   * written to edits.new file, which is called Spooling.</li>
+   * <li> Journal Spool state WAIT blocks journaling until the
+   * Journal Spool reader finalizes merging of the spooled data and
+   * switches to applying journal to memory.</li>
+   * </ol>
+   * @param length length of data.
+   * @param data serialized journal records.
+   * @throws IOException
+   * @see #convergeJournalSpool()
+   */
+  synchronized void journal(int length, byte[] data) throws IOException {
+    assert backupInputStream.length() == 0 : "backup input stream is not empty";
+    try {
+      switch(jsState) {
+        case WAIT:
+        case OFF:
+          // wait until spooling is off
+          waitSpoolEnd();
+          // update NameSpace in memory
+          backupInputStream.setBytes(data);
+          FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+          logLoader.loadEditRecords(storage.getLayoutVersion(),
+                    backupInputStream.getDataInputStream(), true);
+          getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
+          break;
+        case INPROGRESS:
+          break;
+      }
+      // write to files
+      editLog.logEdit(length, data);
+      editLog.logSync();
+    } finally {
+      backupInputStream.clear();
+    }
+  }
+
+  private synchronized void waitSpoolEnd() {
+    while(jsState == JSpoolState.WAIT) {
+      try {
+        wait();
+      } catch (InterruptedException  e) {}
+    }
+    // now spooling should be off, verifying just in case
+    assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState;
+  }
+
+  /**
+   * Start journal spool.
+   * Switch to writing into edits.new instead of edits.
+   *
+   * edits.new for spooling is in separate directory "spool" rather than in
+   * "current" because the two directories should be independent.
+   * While spooling a checkpoint can happen and current will first
+   * move to lastcheckpoint.tmp and then to previous.checkpoint
+   * spool/edits.new will remain in place during that.
+   */
+  synchronized void startJournalSpool(NamenodeRegistration nnReg)
+  throws IOException {
+    switch(jsState) {
+      case OFF:
+        break;
+      case INPROGRESS:
+        return;
+      case WAIT:
+        waitSpoolEnd();
+    }
+
+    // create journal spool directories
+    for (Iterator<StorageDirectory> it
+           = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      File jsDir = getJSpoolDir(sd);
+      if (!jsDir.exists() && !jsDir.mkdirs()) {
+        throw new IOException("Mkdirs failed to create "
+                              + jsDir.getCanonicalPath());
+      }
+      // create edit file if missing
+      File eFile = storage.getEditFile(sd);
+      if(!eFile.exists()) {
+        editLog.createEditLogFile(eFile);
+      }
+    }
+
+    if(!editLog.isOpen())
+      editLog.open();
+
+    // create streams pointing to the journal spool files
+    // subsequent journal records will go directly to the spool
+    editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
+    setCheckpointState(CheckpointStates.ROLLED_EDITS);
+
+    // set up spooling
+    if(backupInputStream == null)
+      backupInputStream = new EditLogBackupInputStream(nnReg.getAddress());
+    jsState = JSpoolState.INPROGRESS;
+  }
+
+  synchronized void setCheckpointTime(int length, byte[] data)
+  throws IOException {
+    assert backupInputStream.length() == 0 : "backup input stream is not empty";
+    try {
+      // unpack new checkpoint time
+      backupInputStream.setBytes(data);
+      DataInputStream in = backupInputStream.getDataInputStream();
+      byte op = in.readByte();
+      assert op == NamenodeProtocol.JA_CHECKPOINT_TIME;
+      LongWritable lw = new LongWritable();
+      lw.readFields(in);
+      storage.setCheckpointTimeInStorage(lw.get());
+    } finally {
+      backupInputStream.clear();
+    }
+  }
+
+  /**
+   * Merge Journal Spool to memory.<p>
+   * Journal Spool reader reads journal records from edits.new.
+   * When it reaches the end of the file it sets {@link JSpoolState} to WAIT.
+   * This blocks journaling (see {@link #journal(int,byte[])}.
+   * The reader
+   * <ul>
+   * <li> reads remaining journal records if any,</li>
+   * <li> renames edits.new to edits,</li>
+   * <li> sets {@link JSpoolState} to OFF,</li>
+   * <li> and notifies the journaling thread.</li>
+   * </ul>
+   * Journaling resumes with applying new journal records to the memory state,
+   * and writing them into edits file(s).
+   */
+  void convergeJournalSpool() throws IOException {
+    Iterator<StorageDirectory> itEdits
+      = storage.dirIterator(NameNodeDirType.EDITS);
+    if(! itEdits.hasNext())
+      throw new IOException("Could not locate checkpoint directories");
+    StorageDirectory sdEdits = itEdits.next();
+    int numEdits = 0;
+    File jSpoolFile = getJSpoolFile(sdEdits);
+    long startTime = now();
+    if(jSpoolFile.exists()) {
+      // load edits.new
+      EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
+      DataInputStream in = edits.getDataInputStream();
+      FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+      numEdits += logLoader.loadFSEdits(in, false);
+
+      // first time reached the end of spool
+      jsState = JSpoolState.WAIT;
+      numEdits += logLoader.loadEditRecords(storage.getLayoutVersion(),
+                                            in, true);
+      getFSNamesystem().dir.updateCountForINodeWithQuota();
+      edits.close();
+    }
+
+    FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath()
+        + " of size " + jSpoolFile.length() + " edits # " + numEdits
+        + " loaded in " + (now()-startTime)/1000 + " seconds.");
+
+    // rename spool edits.new to edits making it in sync with the active node
+    // subsequent journal records will go directly to edits
+    editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
+
+    // write version file
+    resetVersion(false, storage.getImageDigest());
+
+    // wake up journal writer
+    synchronized(this) {
+      jsState = JSpoolState.OFF;
+      notifyAll();
+    }
+
+    // Rename lastcheckpoint.tmp to previous.checkpoint
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      storage.moveLastCheckpoint(sd);
+    }
+  }
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Wed
Feb  2 02:05:18 2011
@@ -114,7 +114,7 @@ public class BackupNode extends NameNode
 
   @Override // NameNode
   protected void loadNamesystem(Configuration conf) throws IOException {
-    BackupStorage bnImage = new BackupStorage();
+    BackupImage bnImage = new BackupImage();
     this.namesystem = new FSNamesystem(conf, bnImage);
     bnImage.recoverCreateRead(FSNamesystem.getNamespaceDirs(conf),
                               FSNamesystem.getNamespaceEditsDirs(conf));
@@ -206,7 +206,7 @@ public class BackupNode extends NameNode
     if(!nnRpcAddress.equals(nnReg.getAddress()))
       throw new IOException("Journal request from unexpected name-node: "
           + nnReg.getAddress() + " expecting " + nnRpcAddress);
-    BackupStorage bnImage = (BackupStorage)getFSImage();
+    BackupImage bnImage = (BackupImage)getFSImage();
     switch(jAction) {
       case (int)JA_IS_ALIVE:
         return;
@@ -228,8 +228,8 @@ public class BackupNode extends NameNode
   boolean shouldCheckpointAtStartup() {
     FSImage fsImage = getFSImage();
     if(isRole(NamenodeRole.CHECKPOINT)) {
-      assert fsImage.getNumStorageDirs() > 0;
-      return ! fsImage.getStorageDir(0).getVersionFile().exists();
+      assert fsImage.getStorage().getNumStorageDirs() > 0;
+      return ! fsImage.getStorage().getStorageDir(0).getVersionFile().exists();
     }
     if(namesystem == null || namesystem.dir == null || getFSImage() == null)
       return true;
@@ -291,14 +291,14 @@ public class BackupNode extends NameNode
    * @throws IOException
    */
   private void registerWith(NamespaceInfo nsInfo) throws IOException {
-    BackupStorage bnImage = (BackupStorage)getFSImage();
+    BackupImage bnImage = (BackupImage)getFSImage();
     // verify namespaceID
-    if(bnImage.getNamespaceID() == 0) // new backup storage
-      bnImage.setStorageInfo(nsInfo);
-    else if(bnImage.getNamespaceID() != nsInfo.getNamespaceID())
+    if(bnImage.getStorage().getNamespaceID() == 0) // new backup storage
+      bnImage.getStorage().setStorageInfo(nsInfo);
+    else if(bnImage.getStorage().getNamespaceID() != nsInfo.getNamespaceID())
       throw new IOException("Incompatible namespaceIDs"
           + ": active node namespaceID = " + nsInfo.getNamespaceID() 
-          + "; backup node namespaceID = " + bnImage.getNamespaceID());
+          + "; backup node namespaceID = " + bnImage.getStorage().getNamespaceID());
 
     setRegistration();
     NamenodeRegistration nnReg = null;
@@ -333,7 +333,7 @@ public class BackupNode extends NameNode
    * @throws IOException
    */
   void resetNamespace() throws IOException {
-    ((BackupStorage)getFSImage()).reset();
+    ((BackupImage)getFSImage()).reset();
   }
 
   /**

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
(original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
Wed Feb  2 02:05:18 2011
@@ -41,10 +41,10 @@ public class CheckpointSignature extends
   public CheckpointSignature() {}
 
   CheckpointSignature(FSImage fsImage) {
-    super(fsImage);
+    super(fsImage.getStorage());
     editsTime = fsImage.getEditLog().getFsEditTime();
-    checkpointTime = fsImage.getCheckpointTime();
-    imageDigest = fsImage.imageDigest;
+    checkpointTime = fsImage.getStorage().getCheckpointTime();
+    imageDigest = fsImage.getStorage().getImageDigest();
   }
 
   CheckpointSignature(String str) {
@@ -76,18 +76,21 @@ public class CheckpointSignature extends
   }
 
   void validateStorageInfo(FSImage si) throws IOException {
-    if(layoutVersion != si.layoutVersion
-        || namespaceID != si.namespaceID || cTime != si.cTime
-        || checkpointTime != si.checkpointTime ||
-        !imageDigest.equals(si.imageDigest)) {
+    if(layoutVersion != si.getStorage().layoutVersion
+       || namespaceID != si.getStorage().namespaceID 
+       || cTime != si.getStorage().cTime
+       || checkpointTime != si.getStorage().getCheckpointTime() ||
+       !imageDigest.equals(si.getStorage().getImageDigest())) {
       // checkpointTime can change when the image is saved - do not compare
       throw new IOException("Inconsistent checkpoint fields.\n"
           + "LV = " + layoutVersion + " namespaceID = " + namespaceID
           + " cTime = " + cTime + "; checkpointTime = " + checkpointTime
           + " ; imageDigest = " + imageDigest
           + ".\nExpecting respectively: "
-          + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime
-          + "; " + si.checkpointTime + "; " + si.imageDigest);
+          + si.getStorage().layoutVersion + "; " 
+          + si.getStorage().namespaceID + "; " + si.getStorage().cTime
+          + "; " + si.getStorage().getCheckpointTime() + "; " 
+          + si.getStorage().getImageDigest());
     }
   }
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Wed
Feb  2 02:05:18 2011
@@ -29,8 +29,8 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -63,8 +63,8 @@ class Checkpointer extends Daemon {
 
   private String infoBindAddress;
 
-  private BackupStorage getFSImage() {
-    return (BackupStorage)backupNode.getFSImage();
+  private BackupImage getFSImage() {
+    return (BackupImage)backupNode.getFSImage();
   }
 
   private NamenodeProtocol getNamenode(){
@@ -178,8 +178,8 @@ class Checkpointer extends Daemon {
   private void downloadCheckpoint(CheckpointSignature sig) throws IOException {
     // Retrieve image file
     String fileid = "getimage=1";
-    Collection<File> list = getFSImage().getFiles(NameNodeFile.IMAGE,
-        NameNodeDirType.IMAGE);
+    Collection<File> list = getFSImage()
+      .getStorage().getFiles(NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
     File[] files = list.toArray(new File[list.size()]);
     assert files.length > 0 : "No checkpoint targets.";
     String nnHttpAddr = backupNode.nnHttpAddress;
@@ -189,7 +189,8 @@ class Checkpointer extends Daemon {
 
     // Retrieve edits file
     fileid = "getedit=1";
-    list = getFSImage().getFiles(NameNodeFile.EDITS, NameNodeDirType.EDITS);
+    list = getFSImage()
+      .getStorage().getFiles(NameNodeFile.EDITS, NameNodeDirType.EDITS);
     files = list.toArray(new File[list.size()]);
     assert files.length > 0 : "No checkpoint targets.";
     TransferFsImage.getFileClient(nnHttpAddr, fileid, files, false);
@@ -207,7 +208,7 @@ class Checkpointer extends Daemon {
     String fileid = "putimage=1&port=" + httpPort +
       "&machine=" + infoBindAddress +
       "&token=" + sig.toString() +
-      "&newChecksum=" + getFSImage().imageDigest.toString();
+      "&newChecksum=" + getFSImage().getStorage().getImageDigest().toString();
     LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid);
     TransferFsImage.getFileClient(backupNode.nnHttpAddress, 
         fileid, (File[])null, false);
@@ -246,7 +247,7 @@ class Checkpointer extends Daemon {
       downloadCheckpoint(sig);
     }
 
-    BackupStorage bnImage = getFSImage();
+    BackupImage bnImage = getFSImage();
     bnImage.loadCheckpoint(sig);
     sig.validateStorageInfo(bnImage);
     bnImage.saveCheckpoint();
@@ -262,6 +263,6 @@ class Checkpointer extends Daemon {
         getFSImage().getEditLog().close();
     LOG.info("Checkpoint completed in "
         + (now() - startTime)/1000 + " seconds."
-        + " New Image Size: " + bnImage.getFsImageName().length());
+        + " New Image Size: " + bnImage.getStorage().getFsImageName().length());
   }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed
Feb  2 02:05:18 2011
@@ -141,8 +141,8 @@ class FSDirectory implements Closeable {
       throws IOException {
     // format before starting up if requested
     if (startOpt == StartupOption.FORMAT) {
-      fsImage.setStorageDirectories(dataDirs, editsDirs);
-      fsImage.format();
+      fsImage.getStorage().setStorageDirectories(dataDirs, editsDirs);
+      fsImage.getStorage().format();
       startOpt = StartupOption.REGULAR;
     }
     try {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Feb
 2 02:05:18 2011
@@ -36,11 +36,11 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NNStorageListener;
 import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.BytesWritable;
@@ -56,7 +56,7 @@ import static org.apache.hadoop.hdfs.ser
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class FSEditLog {
+public class FSEditLog implements NNStorageListener {
 
   static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
       " File system changes are not persistent. No journal streams.";
@@ -66,7 +66,6 @@ public class FSEditLog {
   private volatile int sizeOutputFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
-  private FSImage fsimage = null;
 
   // a monotonically increasing counter that represents transactionIds.
   private long txid = 0;
@@ -89,6 +88,8 @@ public class FSEditLog {
   private long totalTimeTransactions;  // total time for all transactions
   private NameNodeMetrics metrics;
 
+  private NNStorage storage;
+
   private static class TransactionId {
     public long txid;
 
@@ -104,23 +105,24 @@ public class FSEditLog {
     }
   };
 
-  FSEditLog(FSImage image) {
-    fsimage = image;
+  FSEditLog(NNStorage storage) {
     isSyncRunning = false;
+    this.storage = storage;
+    this.storage.registerListener(this);
     metrics = NameNode.getNameNodeMetrics();
     lastPrintTime = now();
   }
   
   private File getEditFile(StorageDirectory sd) {
-    return fsimage.getEditFile(sd);
+    return storage.getEditFile(sd);
   }
   
   private File getEditNewFile(StorageDirectory sd) {
-    return fsimage.getEditNewFile(sd);
+    return storage.getEditNewFile(sd);
   }
   
   private int getNumEditsDirs() {
-   return fsimage.getNumStorageDirs(NameNodeDirType.EDITS);
+   return storage.getNumStorageDirs(NameNodeDirType.EDITS);
   }
 
   synchronized int getNumEditStreams() {
@@ -151,8 +153,8 @@ public class FSEditLog {
       editStreams = new ArrayList<EditLogOutputStream>();
     
     ArrayList<StorageDirectory> al = null;
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+    for (Iterator<StorageDirectory> it 
+         = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
       StorageDirectory sd = it.next();
       File eFile = getEditFile(sd);
       try {
@@ -166,7 +168,7 @@ public class FSEditLog {
       }
     }
     
-    if(al != null) fsimage.processIOError(al, false);
+    if(al != null) storage.reportErrorsOnDirectories(al);
   }
   
   
@@ -210,7 +212,7 @@ public class FSEditLog {
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
     editStreams.clear();
   }
 
@@ -228,64 +230,26 @@ public class FSEditLog {
 
   /**
    * The specified streams have IO errors. Close and remove them.
-   * If propagate is true - close related StorageDirectories.
-   * (is called with propagate value true from everywhere
-   *  except fsimage.processIOError)
-   */
-  synchronized void processIOError(
-      List<EditLogOutputStream> errorStreams,
-      boolean propagate) {
-    
+   */
+  synchronized
+  void disableAndReportErrorOnStreams(List<EditLogOutputStream> errorStreams) {
     if (errorStreams == null || errorStreams.size() == 0) {
       return;                       // nothing to do
     }
-
-    String lsd = fsimage.listStorageDirectories();
-    LOG.info("current list of storage dirs:" + lsd);
-
-    ArrayList<StorageDirectory> al = null;
-    for (EditLogOutputStream eStream : errorStreams) {
-      LOG.error("Unable to log edits to " + eStream.getName()
-          + "; removing it");     
-
-      StorageDirectory storageDir;
-      if(propagate && eStream.getType() == JournalType.FILE && //find SD
-          (storageDir = getStorage(eStream)) != null) {
-        LOG.info("about to remove corresponding storage:" 
-            + storageDir.getRoot().getAbsolutePath());
-        // remove corresponding storage dir
-        if(al == null) al = new ArrayList<StorageDirectory>(1);
-        al.add(storageDir);
+    ArrayList<StorageDirectory> errorDirs = new ArrayList<StorageDirectory>();
+    for (EditLogOutputStream e : errorStreams) {
+      if (e.getType() == JournalType.FILE) {
+        errorDirs.add(getStorageDirectoryForStream(e));
+      } else {
+        disableStream(e);
       }
-      Iterator<EditLogOutputStream> ies = editStreams.iterator();
-      while (ies.hasNext()) {
-        EditLogOutputStream es = ies.next();
-        if (es == eStream) {  
-          try { eStream.close(); } catch (IOException e) {
-            // nothing to do.
-            LOG.warn("Failed to close eStream " + eStream.getName()
-                + " before removing it (might be ok)");
-          }
-          ies.remove();
-          break;
-        }
-      } 
-    }
-    
-    if (editStreams == null || editStreams.size() <= 0) {
-      String msg = "Fatal Error: All storage directories are inaccessible.";
-      LOG.fatal(msg, new IOException(msg)); 
-      Runtime.getRuntime().exit(-1);
     }
 
-    // removed failed SDs
-    if(propagate && al != null) fsimage.processIOError(al, false);
-    
-    //for the rest of the streams
-    if(propagate) incrementCheckpointTime();
-    
-    lsd = fsimage.listStorageDirectories();
-    LOG.info("at the end current list of storage dirs:" + lsd);
+    try {
+      storage.reportErrorsOnDirectories(errorDirs);
+    } catch (IOException ioe) {
+      LOG.error("Problem erroring streams " + ioe);
+    }
   }
 
 
@@ -298,7 +262,7 @@ public class FSEditLog {
     String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
     .getParentFile().getParentFile().getAbsolutePath();
 
-    Iterator<StorageDirectory> it = fsimage.dirIterator(); 
+    Iterator<StorageDirectory> it = storage.dirIterator(); 
     while (it.hasNext()) {
       StorageDirectory sd = it.next();
       LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath());

@@ -355,7 +319,7 @@ public class FSEditLog {
           errorStreams.add(eStream);
         }
       }
-      processIOError(errorStreams, true);
+      disableAndReportErrorOnStreams(errorStreams);
       recordTransaction(start);
       
       // check if it is time to schedule an automatic sync
@@ -547,7 +511,7 @@ public class FSEditLog {
         }
       }
       long elapsed = now() - start;
-      processIOError(errorStreams, true);
+      disableAndReportErrorOnStreams(errorStreams);
   
       if (metrics != null) // Metrics non-null only when used inside name node
         metrics.syncs.inc(elapsed);
@@ -814,7 +778,7 @@ public class FSEditLog {
         al.add(es);
       }
     }
-    if(al!=null) processIOError(al, true);
+    if(al!=null) disableAndReportErrorOnStreams(al);
     return size;
   }
   
@@ -823,7 +787,7 @@ public class FSEditLog {
    */
   synchronized void rollEditLog() throws IOException {
     waitForSyncToFinish();
-    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+    Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
     if(!it.hasNext()) 
       return;
     //
@@ -841,7 +805,7 @@ public class FSEditLog {
       return; // nothing to do, edits.new exists!
 
     // check if any of failed storage is now available and put it back
-    fsimage.attemptRestoreRemovedStorage(false);
+    storage.attemptRestoreRemovedStorage(false);
 
     divertFileStreams(
         Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
@@ -863,7 +827,7 @@ public class FSEditLog {
     EditStreamIterator itE = 
       (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
     Iterator<StorageDirectory> itD = 
-      fsimage.dirIterator(NameNodeDirType.EDITS);
+      storage.dirIterator(NameNodeDirType.EDITS);
     while(itE.hasNext() && itD.hasNext()) {
       EditLogOutputStream eStream = itE.next();
       StorageDirectory sd = itD.next();
@@ -885,7 +849,7 @@ public class FSEditLog {
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
   }
 
   /**
@@ -929,7 +893,7 @@ public class FSEditLog {
     EditStreamIterator itE = 
       (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
     Iterator<StorageDirectory> itD = 
-      fsimage.dirIterator(NameNodeDirType.EDITS);
+      storage.dirIterator(NameNodeDirType.EDITS);
     while(itE.hasNext() && itD.hasNext()) {
       EditLogOutputStream eStream = itE.next();
       StorageDirectory sd = itD.next();
@@ -964,7 +928,7 @@ public class FSEditLog {
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
   }
 
   /**
@@ -973,7 +937,7 @@ public class FSEditLog {
   synchronized File getFsEditName() {
     StorageDirectory sd = null;   
     for (Iterator<StorageDirectory> it = 
-      fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
       sd = it.next();   
       if(sd.getRoot().canRead())
         return getEditFile(sd);
@@ -985,7 +949,7 @@ public class FSEditLog {
    * Returns the timestamp of the edit log
    */
   synchronized long getFsEditTime() {
-    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+    Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
     if(it.hasNext())
       return getEditFile(it.next()).lastModified();
     return 0;
@@ -1052,7 +1016,7 @@ public class FSEditLog {
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
     recordTransaction(start);
   }
 
@@ -1126,8 +1090,8 @@ public class FSEditLog {
   }
 
   void incrementCheckpointTime() {
-    fsimage.incrementCheckpointTime();
-    Writable[] args = {new LongWritable(fsimage.getCheckpointTime())};
+    storage.incrementCheckpointTime();
+    Writable[] args = {new LongWritable(storage.getCheckpointTime())};
     logEdit(OP_CHECKPOINT_TIME, args);
   }
 
@@ -1148,7 +1112,7 @@ public class FSEditLog {
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
   }
 
   synchronized boolean checkBackupRegistration(
@@ -1175,7 +1139,7 @@ public class FSEditLog {
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
     return regAllowed;
   }
   
@@ -1186,4 +1150,80 @@ public class FSEditLog {
     }
     return new BytesWritable(bytes);
   }
+
+  /**
+   * Get the StorageDirectory for a stream
+   * @param es Stream whose StorageDirectory we wish to know
+   * @return the matching StorageDirectory
+   */
+  StorageDirectory getStorageDirectoryForStream(EditLogOutputStream es) {
+    String parentStorageDir = ((EditLogFileOutputStream)es).getFile().getParentFile().getParentFile().getAbsolutePath();
+
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      FSNamesystem.LOG.info("comparing: " + parentStorageDir 
+                            + " and " + sd.getRoot().getAbsolutePath()); 
+      if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
+        return sd;
+    }
+    return null;
+  }
+
+  private synchronized void disableStream(EditLogOutputStream stream) {
+    try { stream.close(); } catch (IOException e) {
+      // nothing to do.
+      LOG.warn("Failed to close eStream " + stream.getName()
+               + " before removing it (might be ok)");
+    }
+    editStreams.remove(stream);
+
+    if (editStreams.size() <= 0) {
+      String msg = "Fatal Error: All storage directories are inaccessible.";
+      LOG.fatal(msg, new IOException(msg));
+      Runtime.getRuntime().exit(-1);
+    }
+  }
+
+  /**
+   * Error Handling on a storageDirectory
+   *
+   */
+  // NNStorageListener Interface
+  @Override // NNStorageListener
+  public synchronized void errorOccurred(StorageDirectory sd)
+      throws IOException {
+    ArrayList<EditLogOutputStream> errorStreams
+      = new ArrayList<EditLogOutputStream>();
+
+    for (EditLogOutputStream eStream : editStreams) {
+      LOG.error("Unable to log edits to " + eStream.getName()
+                + "; removing it");
+
+      StorageDirectory streamStorageDir = getStorageDirectoryForStream(eStream);
+      if (sd == streamStorageDir) {
+        errorStreams.add(eStream);
+      }
+    }
+
+    for (EditLogOutputStream eStream : errorStreams) {
+      disableStream(eStream);
+    }
+  }
+
+  @Override // NNStorageListener
+  public synchronized void formatOccurred(StorageDirectory sd)
+      throws IOException {
+    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+      createEditLogFile(NNStorage.getStorageFile(sd, NameNodeFile.EDITS));
+    }
+  };
+
+  @Override // NNStorageListener
+  public synchronized void directoryAvailable(StorageDirectory sd)
+      throws IOException {
+    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+      File eFile = getEditFile(sd);
+      addNewEditLogStream(eFile);
+    }
+  }
 }



Mime
View raw message