hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1324567 [2/4] - in /hadoop/common/branches/HDFS-3042/hadoop-hdfs-project: ./ hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/contrib/ hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ hadoop-hdfs/sr...
Date Wed, 11 Apr 2012 05:47:46 GMT
Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Wed Apr 11 05:47:40 2012
@@ -417,7 +417,9 @@ class BPServiceActor implements Runnable
   
   
   HeartbeatResponse sendHeartBeat() throws IOException {
-    LOG.info("heartbeat: " + this);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sending heartbeat from service actor: " + this);
+    }
     // reports number of failed volumes
     StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(),
         false,

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Wed Apr 11 05:47:40 2012
@@ -213,19 +213,21 @@ public class BackupImage extends FSImage
         LOG.debug("data:" + StringUtils.byteToHexString(data));
       }
 
-      FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+      FSEditLogLoader logLoader =
+          new FSEditLogLoader(namesystem, lastAppliedTxId);
       int logVersion = storage.getLayoutVersion();
       backupInputStream.setBytes(data, logVersion);
 
-      long numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream, 
-                                                true, lastAppliedTxId + 1);
-      if (numLoaded != numTxns) {
+      long numTxnsAdvanced = logLoader.loadEditRecords(logVersion, 
+          backupInputStream, true, lastAppliedTxId + 1, null);
+      if (numTxnsAdvanced != numTxns) {
         throw new IOException("Batch of txns starting at txnid " +
             firstTxId + " was supposed to contain " + numTxns +
-            " transactions but only was able to apply " + numLoaded);
+            " transactions, but we were only able to advance by " +
+            numTxnsAdvanced);
       }
-      lastAppliedTxId += numTxns;
-      
+      lastAppliedTxId = logLoader.getLastAppliedTxId();
+
       namesystem.dir.updateCountForINodeWithQuota(); // inefficient!
     } finally {
       backupInputStream.clear();
@@ -275,7 +277,7 @@ public class BackupImage extends FSImage
           editStreams.add(s);
         }
       }
-      loadEdits(editStreams, namesystem);
+      loadEdits(editStreams, namesystem, null);
     }
     
     // now, need to load the in-progress file
@@ -309,12 +311,11 @@ public class BackupImage extends FSImage
         LOG.info("Going to finish converging with remaining " + remainingTxns
             + " txns from in-progress stream " + stream);
         
-        FSEditLogLoader loader = new FSEditLogLoader(namesystem);
-        long numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1);
-        lastAppliedTxId += numLoaded;
-        assert numLoaded == remainingTxns :
-          "expected to load " + remainingTxns + " but loaded " +
-          numLoaded + " from " + stream;
+        FSEditLogLoader loader =
+            new FSEditLogLoader(namesystem, lastAppliedTxId);
+        loader.loadFSEdits(stream, lastAppliedTxId + 1, null);
+        lastAppliedTxId = loader.getLastAppliedTxId();
+        assert lastAppliedTxId == getEditLog().getLastWrittenTxId();
       } finally {
         FSEditLog.closeAllStreams(editStreams);
       }

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Wed Apr 11 05:47:40 2012
@@ -17,15 +17,11 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableUtils;
 
 import com.google.common.collect.ComparisonChain;
 
@@ -33,16 +29,15 @@ import com.google.common.collect.Compari
  * A unique signature intended to identify checkpoint transactions.
  */
 @InterfaceAudience.Private
-public class CheckpointSignature extends StorageInfo 
-                      implements WritableComparable<CheckpointSignature> {
+public class CheckpointSignature extends StorageInfo
+    implements Comparable<CheckpointSignature> { 
+
   private static final String FIELD_SEPARATOR = ":";
   private static final int NUM_FIELDS = 7;
   String blockpoolID = "";
   long mostRecentCheckpointTxId;
   long curSegmentTxId;
 
-  public CheckpointSignature() {}
-
   CheckpointSignature(FSImage fsImage) {
     super(fsImage.getStorage());
     blockpoolID = fsImage.getBlockPoolID();
@@ -162,21 +157,4 @@ public class CheckpointSignature extends
             (int)(cTime ^ mostRecentCheckpointTxId ^ curSegmentTxId)
             ^ clusterID.hashCode() ^ blockpoolID.hashCode();
   }
-
-  /////////////////////////////////////////////////
-  // Writable
-  /////////////////////////////////////////////////
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    WritableUtils.writeString(out, blockpoolID);
-    out.writeLong(mostRecentCheckpointTxId);
-    out.writeLong(curSegmentTxId);
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    blockpoolID = WritableUtils.readString(in);
-    mostRecentCheckpointTxId = in.readLong();
-    curSegmentTxId = in.readLong();
-  }
 }

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Wed Apr 11 05:47:40 2012
@@ -292,6 +292,6 @@ class Checkpointer extends Daemon {
     }
     LOG.info("Checkpointer about to load edits from " +
         editsStreams.size() + " stream(s).");
-    dstImage.loadEdits(editsStreams, dstNamesystem);
+    dstImage.loadEdits(editsStreams, dstNamesystem, null);
   }
 }

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Wed Apr 11 05:47:40 2012
@@ -70,21 +70,25 @@ class EditLogBackupInputStream extends E
     reader = null;
   }
 
-  @Override // JournalStream
+  @Override
   public String getName() {
     return address;
   }
 
-  @Override // JournalStream
-  public JournalType getType() {
-    return JournalType.BACKUP;
-  }
-
   @Override
-  public FSEditLogOp readOp() throws IOException {
+  protected FSEditLogOp nextOp() throws IOException {
     Preconditions.checkState(reader != null,
         "Must call setBytes() before readOp()");
-    return reader.readOp();
+    return reader.readOp(false);
+  }
+
+  @Override
+  protected FSEditLogOp nextValidOp() {
+    try {
+      return reader.readOp(true);
+    } catch (IOException e) {
+      throw new RuntimeException("got unexpected IOException " + e, e);
+    }
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Wed Apr 11 05:47:40 2012
@@ -89,24 +89,6 @@ public class EditLogFileInputStream exte
     this.isInProgress = isInProgress;
   }
 
-  /**
-   * Skip over a number of transactions. Subsequent calls to
-   * {@link EditLogFileInputStream#readOp()} will begin after these skipped
-   * transactions. If more transactions are requested to be skipped than remain
-   * in the edit log, all edit log ops in the log will be skipped and subsequent
-   * calls to {@link EditLogInputStream#readOp} will return null.
-   * 
-   * @param transactionsToSkip number of transactions to skip over.
-   * @throws IOException if there's an error while reading an operation
-   */
-  public void skipTransactions(long transactionsToSkip) throws IOException {
-    assert firstTxId != HdfsConstants.INVALID_TXID &&
-        lastTxId != HdfsConstants.INVALID_TXID;
-    for (long i = 0; i < transactionsToSkip; i++) {
-      reader.readOp();
-    }
-  }
-
   @Override
   public long getFirstTxId() throws IOException {
     return firstTxId;
@@ -117,19 +99,23 @@ public class EditLogFileInputStream exte
     return lastTxId;
   }
 
-  @Override // JournalStream
+  @Override
   public String getName() {
     return file.getPath();
   }
 
-  @Override // JournalStream
-  public JournalType getType() {
-    return JournalType.FILE;
+  @Override
+  protected FSEditLogOp nextOp() throws IOException {
+    return reader.readOp(false);
   }
-
+  
   @Override
-  public FSEditLogOp readOp() throws IOException {
-    return reader.readOp();
+  protected FSEditLogOp nextValidOp() {
+    try {
+      return reader.readOp(true);
+    } catch (IOException e) {
+      return null;
+    }
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Wed Apr 11 05:47:40 2012
@@ -34,7 +34,14 @@ import org.apache.hadoop.classification.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public abstract class EditLogInputStream implements JournalStream, Closeable {
+public abstract class EditLogInputStream implements Closeable {
+  private FSEditLogOp cachedOp = null; 
+  
+  /** 
+   * @return the name of the EditLogInputStream
+   */
+  public abstract String getName();
+  
   /** 
    * @return the first transaction which will be found in this stream
    */
@@ -57,9 +64,82 @@ public abstract class EditLogInputStream
    * @return an operation from the stream or null if at end of stream
    * @throws IOException if there is an error reading from the stream
    */
-  public abstract FSEditLogOp readOp() throws IOException;
+  public FSEditLogOp readOp() throws IOException {
+    FSEditLogOp ret;
+    if (cachedOp != null) {
+      ret = cachedOp;
+      cachedOp = null;
+      return ret;
+    }
+    return nextOp();
+  }
 
   /** 
+   * Position the stream so that a valid operation can be read from it with
+   * readOp().
+   * 
+   * This method can be used to skip over corrupted sections of edit logs.
+   */
+  public void resync() throws IOException {
+    if (cachedOp != null) {
+      return;
+    }
+    cachedOp = nextValidOp();
+  }
+  
+  /** 
+   * Get the next operation from the stream storage.
+   * 
+   * @return an operation from the stream or null if at end of stream
+   * @throws IOException if there is an error reading from the stream
+   */
+  protected abstract FSEditLogOp nextOp() throws IOException;
+  
+  /** 
+   * Get the next valid operation from the stream storage.
+   * 
+   * This is exactly like nextOp, except that we attempt to skip over damaged
+   * parts of the edit log
+   * 
+   * @return an operation from the stream or null if at end of stream
+   */
+  protected FSEditLogOp nextValidOp() {
+    // This is a trivial implementation which just assumes that any errors mean
+    // that there is nothing more of value in the log.  Subclasses that support
+    // error recovery will want to override this.
+    try {
+      return nextOp();
+    } catch (IOException e) {
+      return null;
+    }
+  }
+  
+  /** 
+   * Skip edit log operations up to a given transaction ID, or until the
+   * end of the edit log is reached.
+   *
+   * After this function returns, the next call to readOp will return either
+   * end-of-file (null) or a transaction with a txid equal to or higher than
+   * the one we asked for.
+   *
+   * @param txid    The transaction ID to read up until.
+   * @return        Returns true if we found a transaction ID greater than
+   *                or equal to 'txid' in the log.
+   */
+  public boolean skipUntil(long txid) throws IOException {
+    while (true) {
+      FSEditLogOp op = readOp();
+      if (op == null) {
+        return false;
+      }
+      if (op.getTransactionId() >= txid) {
+        cachedOp = op;
+        return true;
+      }
+    }
+  }
+  
+  /** 
    * Get the layout version of the data in the stream.
    * @return the layout version of the ops in the stream.
    * @throws IOException if there is an error reading the version

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Wed Apr 11 05:47:40 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.io.Closeable;
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
@@ -30,7 +31,7 @@ import org.apache.hadoop.classification.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public abstract class EditLogOutputStream {
+public abstract class EditLogOutputStream implements Closeable {
   // these are statistics counters
   private long numSync;        // number of sync(s) to disk
   private long totalTimeSync;  // total time to sync

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Apr 11 05:47:40 2012
@@ -127,6 +127,14 @@ public class FSEditLog  {
   private Configuration conf;
   
   private List<URI> editsDirs;
+
+  private ThreadLocal<OpInstanceCache> cache =
+      new ThreadLocal<OpInstanceCache>() {
+    @Override
+    protected OpInstanceCache initialValue() {
+      return new OpInstanceCache();
+    }
+  };
   
   /**
    * The edit directories that are shared between primary and secondary.
@@ -596,7 +604,7 @@ public class FSEditLog  {
    * Records the block locations of the last block.
    */
   public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
-    AddOp op = AddOp.getInstance()
+    AddOp op = AddOp.getInstance(cache.get())
       .setPath(path)
       .setReplication(newNode.getReplication())
       .setModificationTime(newNode.getModificationTime())
@@ -614,7 +622,7 @@ public class FSEditLog  {
    * Add close lease record to edit log.
    */
   public void logCloseFile(String path, INodeFile newNode) {
-    CloseOp op = CloseOp.getInstance()
+    CloseOp op = CloseOp.getInstance(cache.get())
       .setPath(path)
       .setReplication(newNode.getReplication())
       .setModificationTime(newNode.getModificationTime())
@@ -627,7 +635,7 @@ public class FSEditLog  {
   }
   
   public void logUpdateBlocks(String path, INodeFileUnderConstruction file) {
-    UpdateBlocksOp op = UpdateBlocksOp.getInstance()
+    UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
       .setPath(path)
       .setBlocks(file.getBlocks());
     logEdit(op);
@@ -637,7 +645,7 @@ public class FSEditLog  {
    * Add create directory record to edit log
    */
   public void logMkDir(String path, INode newNode) {
-    MkdirOp op = MkdirOp.getInstance()
+    MkdirOp op = MkdirOp.getInstance(cache.get())
       .setPath(path)
       .setTimestamp(newNode.getModificationTime())
       .setPermissionStatus(newNode.getPermissionStatus());
@@ -649,7 +657,7 @@ public class FSEditLog  {
    * TODO: use String parameters until just before writing to disk
    */
   void logRename(String src, String dst, long timestamp) {
-    RenameOldOp op = RenameOldOp.getInstance()
+    RenameOldOp op = RenameOldOp.getInstance(cache.get())
       .setSource(src)
       .setDestination(dst)
       .setTimestamp(timestamp);
@@ -660,7 +668,7 @@ public class FSEditLog  {
    * Add rename record to edit log
    */
   void logRename(String src, String dst, long timestamp, Options.Rename... options) {
-    RenameOp op = RenameOp.getInstance()
+    RenameOp op = RenameOp.getInstance(cache.get())
       .setSource(src)
       .setDestination(dst)
       .setTimestamp(timestamp)
@@ -672,7 +680,7 @@ public class FSEditLog  {
    * Add set replication record to edit log
    */
   void logSetReplication(String src, short replication) {
-    SetReplicationOp op = SetReplicationOp.getInstance()
+    SetReplicationOp op = SetReplicationOp.getInstance(cache.get())
       .setPath(src)
       .setReplication(replication);
     logEdit(op);
@@ -684,7 +692,7 @@ public class FSEditLog  {
    * @param quota the directory size limit
    */
   void logSetQuota(String src, long nsQuota, long dsQuota) {
-    SetQuotaOp op = SetQuotaOp.getInstance()
+    SetQuotaOp op = SetQuotaOp.getInstance(cache.get())
       .setSource(src)
       .setNSQuota(nsQuota)
       .setDSQuota(dsQuota);
@@ -693,7 +701,7 @@ public class FSEditLog  {
 
   /**  Add set permissions record to edit log */
   void logSetPermissions(String src, FsPermission permissions) {
-    SetPermissionsOp op = SetPermissionsOp.getInstance()
+    SetPermissionsOp op = SetPermissionsOp.getInstance(cache.get())
       .setSource(src)
       .setPermissions(permissions);
     logEdit(op);
@@ -701,7 +709,7 @@ public class FSEditLog  {
 
   /**  Add set owner record to edit log */
   void logSetOwner(String src, String username, String groupname) {
-    SetOwnerOp op = SetOwnerOp.getInstance()
+    SetOwnerOp op = SetOwnerOp.getInstance(cache.get())
       .setSource(src)
       .setUser(username)
       .setGroup(groupname);
@@ -712,7 +720,7 @@ public class FSEditLog  {
    * concat(trg,src..) log
    */
   void logConcat(String trg, String [] srcs, long timestamp) {
-    ConcatDeleteOp op = ConcatDeleteOp.getInstance()
+    ConcatDeleteOp op = ConcatDeleteOp.getInstance(cache.get())
       .setTarget(trg)
       .setSources(srcs)
       .setTimestamp(timestamp);
@@ -723,7 +731,7 @@ public class FSEditLog  {
    * Add delete file record to edit log
    */
   void logDelete(String src, long timestamp) {
-    DeleteOp op = DeleteOp.getInstance()
+    DeleteOp op = DeleteOp.getInstance(cache.get())
       .setPath(src)
       .setTimestamp(timestamp);
     logEdit(op);
@@ -733,7 +741,7 @@ public class FSEditLog  {
    * Add generation stamp record to edit log
    */
   void logGenerationStamp(long genstamp) {
-    SetGenstampOp op = SetGenstampOp.getInstance()
+    SetGenstampOp op = SetGenstampOp.getInstance(cache.get())
       .setGenerationStamp(genstamp);
     logEdit(op);
   }
@@ -742,7 +750,7 @@ public class FSEditLog  {
    * Add access time record to edit log
    */
   void logTimes(String src, long mtime, long atime) {
-    TimesOp op = TimesOp.getInstance()
+    TimesOp op = TimesOp.getInstance(cache.get())
       .setPath(src)
       .setModificationTime(mtime)
       .setAccessTime(atime);
@@ -754,7 +762,7 @@ public class FSEditLog  {
    */
   void logSymlink(String path, String value, long mtime, 
                   long atime, INodeSymlink node) {
-    SymlinkOp op = SymlinkOp.getInstance()
+    SymlinkOp op = SymlinkOp.getInstance(cache.get())
       .setPath(path)
       .setValue(value)
       .setModificationTime(mtime)
@@ -770,7 +778,7 @@ public class FSEditLog  {
    */
   void logGetDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) {
-    GetDelegationTokenOp op = GetDelegationTokenOp.getInstance()
+    GetDelegationTokenOp op = GetDelegationTokenOp.getInstance(cache.get())
       .setDelegationTokenIdentifier(id)
       .setExpiryTime(expiryTime);
     logEdit(op);
@@ -778,26 +786,26 @@ public class FSEditLog  {
   
   void logRenewDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) {
-    RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance()
+    RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance(cache.get())
       .setDelegationTokenIdentifier(id)
       .setExpiryTime(expiryTime);
     logEdit(op);
   }
   
   void logCancelDelegationToken(DelegationTokenIdentifier id) {
-    CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance()
+    CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance(cache.get())
       .setDelegationTokenIdentifier(id);
     logEdit(op);
   }
   
   void logUpdateMasterKey(DelegationKey key) {
-    UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance()
+    UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance(cache.get())
       .setDelegationKey(key);
     logEdit(op);
   }
 
   void logReassignLease(String leaseHolder, String src, String newHolder) {
-    ReassignLeaseOp op = ReassignLeaseOp.getInstance()
+    ReassignLeaseOp op = ReassignLeaseOp.getInstance(cache.get())
       .setLeaseHolder(leaseHolder)
       .setPath(src)
       .setNewHolder(newHolder);
@@ -896,7 +904,7 @@ public class FSEditLog  {
     state = State.IN_SEGMENT;
 
     if (writeHeaderTxn) {
-      logEdit(LogSegmentOp.getInstance(
+      logEdit(LogSegmentOp.getInstance(cache.get(),
           FSEditLogOpCodes.OP_START_LOG_SEGMENT));
       logSync();
     }
@@ -912,7 +920,7 @@ public class FSEditLog  {
         "Bad state: %s", state);
     
     if (writeEndTxn) {
-      logEdit(LogSegmentOp.getInstance(
+      logEdit(LogSegmentOp.getInstance(cache.get(), 
           FSEditLogOpCodes.OP_END_LOG_SEGMENT));
       logSync();
     }

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Apr 11 05:47:40 2012
@@ -71,9 +71,11 @@ public class FSEditLogLoader {
   static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
   static long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
   private final FSNamesystem fsNamesys;
-
-  public FSEditLogLoader(FSNamesystem fsNamesys) {
+  private long lastAppliedTxId;
+  
+  public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
     this.fsNamesys = fsNamesys;
+    this.lastAppliedTxId = lastAppliedTxId;
   }
   
   /**
@@ -81,32 +83,29 @@ public class FSEditLogLoader {
    * This is where we apply edits that we've been writing to disk all
    * along.
    */
-  long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
-      throws IOException {
-    long numEdits = 0;
+  long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
+      MetaRecoveryContext recovery) throws IOException {
     int logVersion = edits.getVersion();
 
     fsNamesys.writeLock();
     try {
       long startTime = now();
-      numEdits = loadEditRecords(logVersion, edits, false, 
-                                 expectedStartingTxId);
+      long numEdits = loadEditRecords(logVersion, edits, false, 
+                                 expectedStartingTxId, recovery);
       FSImage.LOG.info("Edits file " + edits.getName() 
           + " of size " + edits.length() + " edits # " + numEdits 
           + " loaded in " + (now()-startTime)/1000 + " seconds.");
+      return numEdits;
     } finally {
       edits.close();
       fsNamesys.writeUnlock();
     }
-    
-    return numEdits;
   }
 
   long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
-                      long expectedStartingTxId)
-      throws IOException, EditLogInputException {
+                      long expectedStartingTxId, MetaRecoveryContext recovery)
+      throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
-    long numEdits = 0;
 
     EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
       new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
@@ -120,72 +119,99 @@ public class FSEditLogLoader {
 
     long recentOpcodeOffsets[] = new long[4];
     Arrays.fill(recentOpcodeOffsets, -1);
-
-    long txId = expectedStartingTxId - 1;
+    
+    long expectedTxId = expectedStartingTxId;
+    long numEdits = 0;
     long lastTxId = in.getLastTxId();
     long numTxns = (lastTxId - expectedStartingTxId) + 1;
-
     long lastLogTime = now();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("edit log length: " + in.length() + ", start txid: "
           + expectedStartingTxId + ", last txid: " + lastTxId);
     }
-
     try {
-      try {
-        while (true) {
+      while (true) {
+        try {
           FSEditLogOp op;
           try {
-            if ((op = in.readOp()) == null) {
+            op = in.readOp();
+            if (op == null) {
               break;
             }
-          } catch (IOException ioe) {
-            long badTxId = txId + 1; // because txId hasn't been incremented yet
-            String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, badTxId);
+          } catch (Throwable e) {
+            // Handle a problem with our input
+            check203UpgradeFailure(logVersion, e);
+            String errorMessage =
+              formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
             FSImage.LOG.error(errorMessage);
-            throw new EditLogInputException(errorMessage,
-                ioe, numEdits);
+            if (recovery == null) {
+               // We will only try to skip over problematic opcodes when in
+               // recovery mode.
+              throw new EditLogInputException(errorMessage, e, numEdits);
+            }
+            MetaRecoveryContext.editLogLoaderPrompt(
+                "We failed to read txId " + expectedTxId,
+                recovery, "skipping the bad section in the log");
+            in.resync();
+            continue;
           }
           recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
             in.getPosition();
           if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
-            long expectedTxId = txId + 1;
-            txId = op.txid;
-            if (txId != expectedTxId) {
-              throw new IOException("Expected transaction ID " +
-                  expectedTxId + " but got " + txId);
+            if (op.getTransactionId() > expectedTxId) { 
+              MetaRecoveryContext.editLogLoaderPrompt("There appears " +
+                  "to be a gap in the edit log.  We expected txid " +
+                  expectedTxId + ", but got txid " +
+                  op.getTransactionId() + ".", recovery, "ignoring missing " +
+                  " transaction IDs");
+            } else if (op.getTransactionId() < expectedTxId) { 
+              MetaRecoveryContext.editLogLoaderPrompt("There appears " +
+                  "to be an out-of-order edit in the edit log.  We " +
+                  "expected txid " + expectedTxId + ", but got txid " +
+                  op.getTransactionId() + ".", recovery,
+                  "skipping the out-of-order edit");
+              continue;
             }
           }
-
-          incrOpCount(op.opCode, opCounts);
           try {
             applyEditLogOp(op, fsDir, logVersion);
-          } catch (Throwable t) {
-            // Catch Throwable because in the case of a truly corrupt edits log, any
-            // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.)
-            String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, txId);
-            FSImage.LOG.error(errorMessage);
-            throw new IOException(errorMessage, t);
+          } catch (Throwable e) {
+            LOG.error("Encountered exception on operation " + op, e);
+            MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
+             "apply edit log operation " + op + ": error " +
+             e.getMessage(), recovery, "applying edits");
+          }
+          // Now that the operation has been successfully decoded and
+          // applied, update our bookkeeping.
+          incrOpCount(op.opCode, opCounts);
+          if (op.hasTransactionId()) {
+            lastAppliedTxId = op.getTransactionId();
+            expectedTxId = lastAppliedTxId + 1;
+          } else {
+            expectedTxId = lastAppliedTxId = expectedStartingTxId;
           }
-
           // log progress
-          if (now() - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
-            int percent = Math.round((float) txId / numTxns * 100);
-            LOG.info("replaying edit log: " + txId + "/" + numTxns
-                + " transactions completed. (" + percent + "%)");
-            lastLogTime = now();
+          if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+            long now = now();
+            if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
+              int percent = Math.round((float)lastAppliedTxId / numTxns * 100);
+              LOG.info("replaying edit log: " + lastAppliedTxId + "/" + numTxns
+                  + " transactions completed. (" + percent + "%)");
+              lastLogTime = now;
+            }
           }
-
           numEdits++;
+        } catch (MetaRecoveryContext.RequestStopException e) {
+          MetaRecoveryContext.LOG.warn("Stopped reading edit log at " +
+              in.getPosition() + "/"  + in.length());
+          break;
         }
-      } catch (IOException ex) {
-        check203UpgradeFailure(logVersion, ex);
-      } finally {
-        if(closeOnExit)
-          in.close();
       }
     } finally {
+      if(closeOnExit) {
+        in.close();
+      }
       fsDir.writeUnlock();
       fsNamesys.writeUnlock();
 
@@ -472,7 +498,7 @@ public class FSEditLogLoader {
       long recentOpcodeOffsets[], long txid) {
     StringBuilder sb = new StringBuilder();
     sb.append("Error replaying edit log at offset " + in.getPosition());
-    sb.append(" on transaction ID ").append(txid);
+    sb.append(".  Expected transaction ID was ").append(txid);
     if (recentOpcodeOffsets[0] != -1) {
       Arrays.sort(recentOpcodeOffsets);
       sb.append("\nRecent opcode offsets:");
@@ -519,7 +545,7 @@ public class FSEditLogLoader {
       if (oldBlock.getBlockId() != newBlock.getBlockId() ||
           (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() && 
               !(isGenStampUpdate && isLastBlock))) {
-        throw new IOException("Mismatched block IDs or generation stamps, " + 
+        throw new IOException("Mismatched block IDs or generation stamps, " +
             "attempting to replace block " + oldBlock + " with " + newBlock +
             " as block # " + i + "/" + newBlocks.length + " of " +
             path);
@@ -605,7 +631,7 @@ public class FSEditLogLoader {
    * Throw appropriate exception during upgrade from 203, when editlog loading
    * could fail due to opcode conflicts.
    */
-  private void check203UpgradeFailure(int logVersion, IOException ex)
+  private void check203UpgradeFailure(int logVersion, Throwable e)
       throws IOException {
     // 0.20.203 version version has conflicting opcodes with the later releases.
     // The editlog must be emptied by restarting the namenode, before proceeding
@@ -616,9 +642,7 @@ public class FSEditLogLoader {
           + logVersion + " from release 0.20.203. Please go back to the old "
           + " release and restart the namenode. This empties the editlog "
           + " and saves the namespace. Resume the upgrade after this step.";
-      throw new IOException(msg, ex);
-    } else {
-      throw ex;
+      throw new IOException(msg, e);
     }
   }
   
@@ -643,14 +667,14 @@ public class FSEditLogLoader {
           break;
         }
         if (firstTxId == HdfsConstants.INVALID_TXID) {
-          firstTxId = op.txid;
+          firstTxId = op.getTransactionId();
         }
         if (lastTxId == HdfsConstants.INVALID_TXID
-            || op.txid == lastTxId + 1) {
-          lastTxId = op.txid;
+            || op.getTransactionId() == lastTxId + 1) {
+          lastTxId = op.getTransactionId();
         } else {
-          FSImage.LOG.error("Out of order txid found. Found " + op.txid 
-                            + ", expected " + (lastTxId + 1));
+          FSImage.LOG.error("Out of order txid found. Found " +
+            op.getTransactionId() + ", expected " + (lastTxId + 1));
           break;
         }
         numValid++;
@@ -743,4 +767,7 @@ public class FSEditLogLoader {
     }
   }
 
+  public long getLastAppliedTxId() {
+    return lastAppliedTxId;
+  }
 }

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Wed Apr 11 05:47:40 2012
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Options.Rena
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.util.PureJavaCrc32;
@@ -54,6 +56,8 @@ import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 import org.xml.sax.helpers.AttributesImpl;
 
+import com.google.common.base.Preconditions;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataInputStream;
@@ -74,42 +78,44 @@ public abstract class FSEditLogOp {
 
 
   @SuppressWarnings("deprecation")
-  private static ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>> opInstances =
-    new ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>>() {
-      @Override
-      protected EnumMap<FSEditLogOpCodes, FSEditLogOp> initialValue() {
-        EnumMap<FSEditLogOpCodes, FSEditLogOp> instances 
-          = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
-        instances.put(OP_ADD, new AddOp());
-        instances.put(OP_CLOSE, new CloseOp());
-        instances.put(OP_SET_REPLICATION, new SetReplicationOp());
-        instances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
-        instances.put(OP_RENAME_OLD, new RenameOldOp());
-        instances.put(OP_DELETE, new DeleteOp());
-        instances.put(OP_MKDIR, new MkdirOp());
-        instances.put(OP_SET_GENSTAMP, new SetGenstampOp());
-        instances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
-        instances.put(OP_SET_OWNER, new SetOwnerOp());
-        instances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
-        instances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
-        instances.put(OP_SET_QUOTA, new SetQuotaOp());
-        instances.put(OP_TIMES, new TimesOp());
-        instances.put(OP_SYMLINK, new SymlinkOp());
-        instances.put(OP_RENAME, new RenameOp());
-        instances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
-        instances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
-        instances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
-        instances.put(OP_CANCEL_DELEGATION_TOKEN, 
-                      new CancelDelegationTokenOp());
-        instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
-        instances.put(OP_START_LOG_SEGMENT,
-                      new LogSegmentOp(OP_START_LOG_SEGMENT));
-        instances.put(OP_END_LOG_SEGMENT,
-                      new LogSegmentOp(OP_END_LOG_SEGMENT));
-        instances.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
-        return instances;
-      }
-  };
+  final public static class OpInstanceCache {
+    private EnumMap<FSEditLogOpCodes, FSEditLogOp> inst = 
+        new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
+    
+    public OpInstanceCache() {
+      inst.put(OP_ADD, new AddOp());
+      inst.put(OP_CLOSE, new CloseOp());
+      inst.put(OP_SET_REPLICATION, new SetReplicationOp());
+      inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
+      inst.put(OP_RENAME_OLD, new RenameOldOp());
+      inst.put(OP_DELETE, new DeleteOp());
+      inst.put(OP_MKDIR, new MkdirOp());
+      inst.put(OP_SET_GENSTAMP, new SetGenstampOp());
+      inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
+      inst.put(OP_SET_OWNER, new SetOwnerOp());
+      inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
+      inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
+      inst.put(OP_SET_QUOTA, new SetQuotaOp());
+      inst.put(OP_TIMES, new TimesOp());
+      inst.put(OP_SYMLINK, new SymlinkOp());
+      inst.put(OP_RENAME, new RenameOp());
+      inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
+      inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
+      inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
+      inst.put(OP_CANCEL_DELEGATION_TOKEN, 
+                    new CancelDelegationTokenOp());
+      inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
+      inst.put(OP_START_LOG_SEGMENT,
+                    new LogSegmentOp(OP_START_LOG_SEGMENT));
+      inst.put(OP_END_LOG_SEGMENT,
+                    new LogSegmentOp(OP_END_LOG_SEGMENT));
+      inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
+    }
+    
+    public FSEditLogOp get(FSEditLogOpCodes opcode) {
+      return inst.get(opcode);
+    }
+  }
 
   /**
    * Constructor for an EditLog Op. EditLog ops cannot be constructed
@@ -117,13 +123,22 @@ public abstract class FSEditLogOp {
    */
   private FSEditLogOp(FSEditLogOpCodes opCode) {
     this.opCode = opCode;
-    this.txid = 0;
+    this.txid = HdfsConstants.INVALID_TXID;
   }
 
   public long getTransactionId() {
+    Preconditions.checkState(txid != HdfsConstants.INVALID_TXID);
     return txid;
   }
 
+  public String getTransactionIdStr() {
+    return (txid == HdfsConstants.INVALID_TXID) ? "(none)" : "" + txid;
+  }
+  
+  public boolean hasTransactionId() {
+    return (txid != HdfsConstants.INVALID_TXID);
+  }
+
   public void setTransactionId(long txid) {
     this.txid = txid;
   }
@@ -373,8 +388,8 @@ public abstract class FSEditLogOp {
       super(OP_ADD);
     }
 
-    static AddOp getInstance() {
-      return (AddOp)opInstances.get().get(OP_ADD);
+    static AddOp getInstance(OpInstanceCache cache) {
+      return (AddOp)cache.get(OP_ADD);
     }
 
     public boolean shouldCompleteLastBlock() {
@@ -395,8 +410,8 @@ public abstract class FSEditLogOp {
       super(OP_CLOSE);
     }
 
-    static CloseOp getInstance() {
-      return (CloseOp)opInstances.get().get(OP_CLOSE);
+    static CloseOp getInstance(OpInstanceCache cache) {
+      return (CloseOp)cache.get(OP_CLOSE);
     }
 
     public boolean shouldCompleteLastBlock() {
@@ -420,9 +435,8 @@ public abstract class FSEditLogOp {
       super(OP_UPDATE_BLOCKS);
     }
     
-    static UpdateBlocksOp getInstance() {
-      return (UpdateBlocksOp)opInstances.get()
-        .get(OP_UPDATE_BLOCKS);
+    static UpdateBlocksOp getInstance(OpInstanceCache cache) {
+      return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS);
     }
     
     
@@ -500,9 +514,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_REPLICATION);
     }
 
-    static SetReplicationOp getInstance() {
-      return (SetReplicationOp)opInstances.get()
-        .get(OP_SET_REPLICATION);
+    static SetReplicationOp getInstance(OpInstanceCache cache) {
+      return (SetReplicationOp)cache.get(OP_SET_REPLICATION);
     }
 
     SetReplicationOp setPath(String path) {
@@ -571,9 +584,8 @@ public abstract class FSEditLogOp {
       super(OP_CONCAT_DELETE);
     }
 
-    static ConcatDeleteOp getInstance() {
-      return (ConcatDeleteOp)opInstances.get()
-        .get(OP_CONCAT_DELETE);
+    static ConcatDeleteOp getInstance(OpInstanceCache cache) {
+      return (ConcatDeleteOp)cache.get(OP_CONCAT_DELETE);
     }
 
     ConcatDeleteOp setTarget(String trg) {
@@ -697,9 +709,8 @@ public abstract class FSEditLogOp {
       super(OP_RENAME_OLD);
     }
 
-    static RenameOldOp getInstance() {
-      return (RenameOldOp)opInstances.get()
-        .get(OP_RENAME_OLD);
+    static RenameOldOp getInstance(OpInstanceCache cache) {
+      return (RenameOldOp)cache.get(OP_RENAME_OLD);
     }
 
     RenameOldOp setSource(String src) {
@@ -790,9 +801,8 @@ public abstract class FSEditLogOp {
       super(OP_DELETE);
     }
 
-    static DeleteOp getInstance() {
-      return (DeleteOp)opInstances.get()
-        .get(OP_DELETE);
+    static DeleteOp getInstance(OpInstanceCache cache) {
+      return (DeleteOp)cache.get(OP_DELETE);
     }
 
     DeleteOp setPath(String path) {
@@ -872,9 +882,8 @@ public abstract class FSEditLogOp {
       super(OP_MKDIR);
     }
     
-    static MkdirOp getInstance() {
-      return (MkdirOp)opInstances.get()
-        .get(OP_MKDIR);
+    static MkdirOp getInstance(OpInstanceCache cache) {
+      return (MkdirOp)cache.get(OP_MKDIR);
     }
 
     MkdirOp setPath(String path) {
@@ -977,9 +986,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_GENSTAMP);
     }
 
-    static SetGenstampOp getInstance() {
-      return (SetGenstampOp)opInstances.get()
-        .get(OP_SET_GENSTAMP);
+    static SetGenstampOp getInstance(OpInstanceCache cache) {
+      return (SetGenstampOp)cache.get(OP_SET_GENSTAMP);
     }
 
     SetGenstampOp setGenerationStamp(long genStamp) {
@@ -1031,9 +1039,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_PERMISSIONS);
     }
 
-    static SetPermissionsOp getInstance() {
-      return (SetPermissionsOp)opInstances.get()
-        .get(OP_SET_PERMISSIONS);
+    static SetPermissionsOp getInstance(OpInstanceCache cache) {
+      return (SetPermissionsOp)cache.get(OP_SET_PERMISSIONS);
     }
 
     SetPermissionsOp setSource(String src) {
@@ -1098,9 +1105,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_OWNER);
     }
 
-    static SetOwnerOp getInstance() {
-      return (SetOwnerOp)opInstances.get()
-        .get(OP_SET_OWNER);
+    static SetOwnerOp getInstance(OpInstanceCache cache) {
+      return (SetOwnerOp)cache.get(OP_SET_OWNER);
     }
 
     SetOwnerOp setSource(String src) {
@@ -1179,9 +1185,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_NS_QUOTA);
     }
 
-    static SetNSQuotaOp getInstance() {
-      return (SetNSQuotaOp)opInstances.get()
-        .get(OP_SET_NS_QUOTA);
+    static SetNSQuotaOp getInstance(OpInstanceCache cache) {
+      return (SetNSQuotaOp)cache.get(OP_SET_NS_QUOTA);
     }
 
     @Override
@@ -1232,9 +1237,8 @@ public abstract class FSEditLogOp {
       super(OP_CLEAR_NS_QUOTA);
     }
 
-    static ClearNSQuotaOp getInstance() {
-      return (ClearNSQuotaOp)opInstances.get()
-        .get(OP_CLEAR_NS_QUOTA);
+    static ClearNSQuotaOp getInstance(OpInstanceCache cache) {
+      return (ClearNSQuotaOp)cache.get(OP_CLEAR_NS_QUOTA);
     }
 
     @Override
@@ -1281,9 +1285,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_QUOTA);
     }
 
-    static SetQuotaOp getInstance() {
-      return (SetQuotaOp)opInstances.get()
-        .get(OP_SET_QUOTA);
+    static SetQuotaOp getInstance(OpInstanceCache cache) {
+      return (SetQuotaOp)cache.get(OP_SET_QUOTA);
     }
 
     SetQuotaOp setSource(String src) {
@@ -1360,9 +1363,8 @@ public abstract class FSEditLogOp {
       super(OP_TIMES);
     }
 
-    static TimesOp getInstance() {
-      return (TimesOp)opInstances.get()
-        .get(OP_TIMES);
+    static TimesOp getInstance(OpInstanceCache cache) {
+      return (TimesOp)cache.get(OP_TIMES);
     }
 
     TimesOp setPath(String path) {
@@ -1458,9 +1460,8 @@ public abstract class FSEditLogOp {
       super(OP_SYMLINK);
     }
 
-    static SymlinkOp getInstance() {
-      return (SymlinkOp)opInstances.get()
-        .get(OP_SYMLINK);
+    static SymlinkOp getInstance(OpInstanceCache cache) {
+      return (SymlinkOp)cache.get(OP_SYMLINK);
     }
 
     SymlinkOp setPath(String path) {
@@ -1579,9 +1580,8 @@ public abstract class FSEditLogOp {
       super(OP_RENAME);
     }
 
-    static RenameOp getInstance() {
-      return (RenameOp)opInstances.get()
-        .get(OP_RENAME);
+    static RenameOp getInstance(OpInstanceCache cache) {
+      return (RenameOp)cache.get(OP_RENAME);
     }
 
     RenameOp setSource(String src) {
@@ -1723,9 +1723,8 @@ public abstract class FSEditLogOp {
       super(OP_REASSIGN_LEASE);
     }
 
-    static ReassignLeaseOp getInstance() {
-      return (ReassignLeaseOp)opInstances.get()
-        .get(OP_REASSIGN_LEASE);
+    static ReassignLeaseOp getInstance(OpInstanceCache cache) {
+      return (ReassignLeaseOp)cache.get(OP_REASSIGN_LEASE);
     }
 
     ReassignLeaseOp setLeaseHolder(String leaseHolder) {
@@ -1798,9 +1797,8 @@ public abstract class FSEditLogOp {
       super(OP_GET_DELEGATION_TOKEN);
     }
 
-    static GetDelegationTokenOp getInstance() {
-      return (GetDelegationTokenOp)opInstances.get()
-        .get(OP_GET_DELEGATION_TOKEN);
+    static GetDelegationTokenOp getInstance(OpInstanceCache cache) {
+      return (GetDelegationTokenOp)cache.get(OP_GET_DELEGATION_TOKEN);
     }
 
     GetDelegationTokenOp setDelegationTokenIdentifier(
@@ -1870,9 +1868,8 @@ public abstract class FSEditLogOp {
       super(OP_RENEW_DELEGATION_TOKEN);
     }
 
-    static RenewDelegationTokenOp getInstance() {
-      return (RenewDelegationTokenOp)opInstances.get()
-          .get(OP_RENEW_DELEGATION_TOKEN);
+    static RenewDelegationTokenOp getInstance(OpInstanceCache cache) {
+      return (RenewDelegationTokenOp)cache.get(OP_RENEW_DELEGATION_TOKEN);
     }
 
     RenewDelegationTokenOp setDelegationTokenIdentifier(
@@ -1941,9 +1938,8 @@ public abstract class FSEditLogOp {
       super(OP_CANCEL_DELEGATION_TOKEN);
     }
 
-    static CancelDelegationTokenOp getInstance() {
-      return (CancelDelegationTokenOp)opInstances.get()
-          .get(OP_CANCEL_DELEGATION_TOKEN);
+    static CancelDelegationTokenOp getInstance(OpInstanceCache cache) {
+      return (CancelDelegationTokenOp)cache.get(OP_CANCEL_DELEGATION_TOKEN);
     }
 
     CancelDelegationTokenOp setDelegationTokenIdentifier(
@@ -1996,9 +1992,8 @@ public abstract class FSEditLogOp {
       super(OP_UPDATE_MASTER_KEY);
     }
 
-    static UpdateMasterKeyOp getInstance() {
-      return (UpdateMasterKeyOp)opInstances.get()
-          .get(OP_UPDATE_MASTER_KEY);
+    static UpdateMasterKeyOp getInstance(OpInstanceCache cache) {
+      return (UpdateMasterKeyOp)cache.get(OP_UPDATE_MASTER_KEY);
     }
 
     UpdateMasterKeyOp setDelegationKey(DelegationKey key) {
@@ -2050,8 +2045,9 @@ public abstract class FSEditLogOp {
              code == OP_END_LOG_SEGMENT : "Bad op: " + code;
     }
 
-    static LogSegmentOp getInstance(FSEditLogOpCodes code) {
-      return (LogSegmentOp)opInstances.get().get(code);
+    static LogSegmentOp getInstance(OpInstanceCache cache,
+        FSEditLogOpCodes code) {
+      return (LogSegmentOp)cache.get(code);
     }
 
     public void readFields(DataInputStream in, int logVersion)
@@ -2091,8 +2087,8 @@ public abstract class FSEditLogOp {
       super(OP_INVALID);
     }
 
-    static InvalidOp getInstance() {
-      return (InvalidOp)opInstances.get().get(OP_INVALID);
+    static InvalidOp getInstance(OpInstanceCache cache) {
+      return (InvalidOp)cache.get(OP_INVALID);
     }
 
     @Override
@@ -2207,6 +2203,7 @@ public abstract class FSEditLogOp {
     private final DataInputStream in;
     private final int logVersion;
     private final Checksum checksum;
+    private final OpInstanceCache cache;
 
     /**
      * Construct the reader
@@ -2228,6 +2225,7 @@ public abstract class FSEditLogOp {
       } else {
         this.in = in;
       }
+      this.cache = new OpInstanceCache();
     }
 
     /**
@@ -2236,16 +2234,42 @@ public abstract class FSEditLogOp {
      * Note that the objects returned from this method may be re-used by future
      * calls to the same method.
      * 
+     * @param skipBrokenEdits    If true, attempt to skip over damaged parts of
+     * the input stream, rather than throwing an IOException
      * @return the operation read from the stream, or null at the end of the file
      * @throws IOException on error.
      */
-    public FSEditLogOp readOp() throws IOException {
+    public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
+      FSEditLogOp op = null;
+      while (true) {
+        try {
+          in.mark(in.available());
+          try {
+            op = decodeOp();
+          } finally {
+            // If we encountered an exception or an end-of-file condition,
+            // do not advance the input stream.
+            if (op == null) {
+              in.reset();
+            }
+          }
+          return op;
+        } catch (IOException e) {
+          if (!skipBrokenEdits) {
+            throw e;
+          }
+          if (in.skip(1) < 1) {
+            return null;
+          }
+        }
+      }
+    }
+
+    private FSEditLogOp decodeOp() throws IOException {
       if (checksum != null) {
         checksum.reset();
       }
 
-      in.mark(1);
-
       byte opCodeByte;
       try {
         opCodeByte = in.readByte();
@@ -2255,12 +2279,10 @@ public abstract class FSEditLogOp {
       }
 
       FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
-      if (opCode == OP_INVALID) {
-        in.reset(); // reset back to end of file if somebody reads it again
+      if (opCode == OP_INVALID)
         return null;
-      }
 
-      FSEditLogOp op = opInstances.get().get(opCode);
+      FSEditLogOp op = cache.get(opCode);
       if (op == null) {
         throw new IOException("Read invalid opcode " + opCode);
       }
@@ -2268,6 +2290,8 @@ public abstract class FSEditLogOp {
       if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
         // Read the txid
         op.setTransactionId(in.readLong());
+      } else {
+        op.setTransactionId(HdfsConstants.INVALID_TXID);
       }
 
       op.readFields(in, logVersion);
@@ -2426,8 +2450,4 @@ public abstract class FSEditLogOp {
     short mode = Short.valueOf(st.getValue("MODE"));
     return new PermissionStatus(username, groupname, new FsPermission(mode));
   }
-
-  public static FSEditLogOp getOpInstance(FSEditLogOpCodes opCode) {
-    return opInstances.get().get(opCode);
-  }
-}
+		}

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Apr 11 05:47:40 2012
@@ -158,8 +158,8 @@ public class FSImage implements Closeabl
    * @throws IOException
    * @return true if the image needs to be saved or false otherwise
    */
-  boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target)
-      throws IOException {
+  boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
+      MetaRecoveryContext recovery) throws IOException {
     assert startOpt != StartupOption.FORMAT : 
       "NameNode formatting should be performed before reading the image";
     
@@ -244,7 +244,7 @@ public class FSImage implements Closeabl
       // just load the image
     }
     
-    return loadFSImage(target);
+    return loadFSImage(target, recovery);
   }
   
   /**
@@ -304,7 +304,7 @@ public class FSImage implements Closeabl
     if(storage.getDistributedUpgradeState()) {
       // only distributed upgrade need to continue
       // don't do version upgrade
-      this.loadFSImage(target);
+      this.loadFSImage(target, null);
       storage.initializeDistributedUpgrade();
       return;
     }
@@ -319,7 +319,7 @@ public class FSImage implements Closeabl
     }
 
     // load the latest image
-    this.loadFSImage(target);
+    this.loadFSImage(target, null);
 
     // Do upgrade for each directory
     long oldCTime = storage.getCTime();
@@ -505,7 +505,7 @@ public class FSImage implements Closeabl
     target.dir.fsImage = ckptImage;
     // load from the checkpoint dirs
     try {
-      ckptImage.recoverTransitionRead(StartupOption.REGULAR, target);
+      ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null);
     } finally {
       ckptImage.close();
     }
@@ -550,7 +550,7 @@ public class FSImage implements Closeabl
     target.dir.reset();
 
     LOG.debug("Reloading namespace from " + file);
-    loadFSImage(file, target);
+    loadFSImage(file, target, null);
   }
 
   /**
@@ -568,7 +568,8 @@ public class FSImage implements Closeabl
    * @return whether the image should be saved
    * @throws IOException
    */
-  boolean loadFSImage(FSNamesystem target) throws IOException {
+  boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
+      throws IOException {
     FSImageStorageInspector inspector = storage.readAndInspectDirs();
     
     isUpgradeFinalized = inspector.isUpgradeFinalized();
@@ -583,7 +584,6 @@ public class FSImage implements Closeabl
       // We only want to recover streams if we're going into Active mode.
       editLog.recoverUnclosedStreams();
     }
-
     if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, 
                                getLayoutVersion())) {
       // If we're open for write, we're either non-HA or we're the active NN, so
@@ -610,7 +610,7 @@ public class FSImage implements Closeabl
                                  getLayoutVersion())) {
         // For txid-based layout, we should have a .md5 file
         // next to the image file
-        loadFSImage(imageFile.getFile(), target);
+        loadFSImage(imageFile.getFile(), target, recovery);
       } else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
                                         getLayoutVersion())) {
         // In 0.22, we have the checksum stored in the VERSION file.
@@ -622,22 +622,19 @@ public class FSImage implements Closeabl
               NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
               " not set for storage directory " + sdForProperties.getRoot());
         }
-        loadFSImage(imageFile.getFile(), new MD5Hash(md5), target);
+        loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery);
       } else {
         // We don't have any record of the md5sum
-        loadFSImage(imageFile.getFile(), null, target);
+        loadFSImage(imageFile.getFile(), null, target, recovery);
       }
     } catch (IOException ioe) {
       FSEditLog.closeAllStreams(editStreams);
       throw new IOException("Failed to load image from " + imageFile, ioe);
     }
-    
-    long numLoaded = loadEdits(editStreams, target);
+    long txnsAdvanced = loadEdits(editStreams, target, recovery);
     needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
-                                                    numLoaded);
-    
-    // update the txid for the edit log
-    editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
+                                                    txnsAdvanced);
+    editLog.setNextTxId(lastAppliedTxId + 1);
     return needToSave;
   }
 
@@ -664,33 +661,29 @@ public class FSImage implements Closeabl
   
   /**
    * Load the specified list of edit files into the image.
-   * @return the number of transactions loaded
    */
   public long loadEdits(Iterable<EditLogInputStream> editStreams,
-      FSNamesystem target) throws IOException, EditLogInputException {
+      FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
     LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editStreams));
-
-    long startingTxId = getLastAppliedTxId() + 1;
-    long numLoaded = 0;
-
+    
+    long prevLastAppliedTxId = lastAppliedTxId;  
     try {    
-      FSEditLogLoader loader = new FSEditLogLoader(target);
+      FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
       
       // Load latest edits
       for (EditLogInputStream editIn : editStreams) {
-        LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
-        long thisNumLoaded = 0;
+        LOG.info("Reading " + editIn + " expecting start txid #" +
+              (lastAppliedTxId + 1));
         try {
-          thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
-        } catch (EditLogInputException elie) {
-          thisNumLoaded = elie.getNumEditsLoaded();
-          throw elie;
+          loader.loadFSEdits(editIn, lastAppliedTxId + 1, recovery);
         } finally {
           // Update lastAppliedTxId even in case of error, since some ops may
           // have been successfully applied before the error.
-          lastAppliedTxId = startingTxId + thisNumLoaded - 1;
-          startingTxId += thisNumLoaded;
-          numLoaded += thisNumLoaded;
+          lastAppliedTxId = loader.getLastAppliedTxId();
+        }
+        // If we are in recovery mode, we may have skipped over some txids.
+        if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID) {
+          lastAppliedTxId = editIn.getLastTxId();
         }
       }
     } finally {
@@ -698,8 +691,7 @@ public class FSImage implements Closeabl
       // update the counts
       target.dir.updateCountForINodeWithQuota();   
     }
-    
-    return numLoaded;
+    return lastAppliedTxId - prevLastAppliedTxId;
   }
 
 
@@ -707,14 +699,14 @@ public class FSImage implements Closeabl
    * Load the image namespace from the given image file, verifying
    * it against the MD5 sum stored in its associated .md5 file.
    */
-  private void loadFSImage(File imageFile, FSNamesystem target)
-      throws IOException {
+  private void loadFSImage(File imageFile, FSNamesystem target,
+      MetaRecoveryContext recovery) throws IOException {
     MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
     if (expectedMD5 == null) {
       throw new IOException("No MD5 file found corresponding to image file "
           + imageFile);
     }
-    loadFSImage(imageFile, expectedMD5, target);
+    loadFSImage(imageFile, expectedMD5, target, recovery);
   }
   
   /**
@@ -722,7 +714,7 @@ public class FSImage implements Closeabl
    * filenames and blocks.
    */
   private void loadFSImage(File curFile, MD5Hash expectedMd5,
-      FSNamesystem target) throws IOException {
+      FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
     FSImageFormat.Loader loader = new FSImageFormat.Loader(
         conf, target);
     loader.load(curFile);

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Wed Apr 11 05:47:40 2012
@@ -56,7 +56,14 @@ class FSImageTransactionalStorageInspect
       return;
     }
     
-    maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+    // Check for a seen_txid file, which marks a minimum transaction ID that
+    // must be included in our load plan.
+    try {
+      maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+    } catch (IOException ioe) {
+      LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
+      return;
+    }
 
     File currentDir = sd.getCurrentDir();
     File filesInStorage[];
@@ -91,15 +98,6 @@ class FSImageTransactionalStorageInspect
       }
     }
     
-
-    // Check for a seen_txid file, which marks a minimum transaction ID that
-    // must be included in our load plan.
-    try {
-      maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
-    } catch (IOException ioe) {
-      LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
-    }
-    
     // set finalized flag
     isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
   }

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Apr 11 05:47:40 2012
@@ -380,9 +380,12 @@ public class FSNamesystem implements Nam
 
     FSImage fsImage = new FSImage(conf, namespaceDirs, namespaceEditsDirs);
     FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
+    StartupOption startOpt = NameNode.getStartupOption(conf);
+    if (startOpt == StartupOption.RECOVER) {
+      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    }
 
     long loadStart = now();
-    StartupOption startOpt = NameNode.getStartupOption(conf);
     String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
     namesystem.loadFSImage(startOpt, fsImage,
       HAUtil.isHAEnabled(conf, nameserviceId));
@@ -491,7 +494,8 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       // We shouldn't be calling saveNamespace if we've come up in standby state.
-      if (fsImage.recoverTransitionRead(startOpt, this) && !haEnabled) {
+      MetaRecoveryContext recovery = startOpt.createRecoveryContext();
+      if (fsImage.recoverTransitionRead(startOpt, this, recovery) && !haEnabled) {
         fsImage.saveNamespace(this);
       }
       // This will start a new log segment and write to the seen_txid file, so
@@ -2120,10 +2124,12 @@ public class FSNamesystem implements Nam
 
   /** 
    * Check all blocks of a file. If any blocks are lower than their intended
-   * replication factor, then insert them into neededReplication
+   * replication factor, then insert them into neededReplication and if 
+   * the blocks are more than the intended replication factor then insert 
+   * them into invalidateBlocks.
    */
   private void checkReplicationFactor(INodeFile file) {
-    int numExpectedReplicas = file.getReplication();
+    short numExpectedReplicas = file.getReplication();
     Block[] pendingBlocks = file.getBlocks();
     int nrBlocks = pendingBlocks.length;
     for (int i = 0; i < nrBlocks; i++) {

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Wed Apr 11 05:47:40 2012
@@ -232,7 +232,10 @@ class FileJournalManager implements Jour
           LOG.info(String.format("Log begins at txid %d, but requested start "
               + "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId,
               transactionsToSkip));
-          elfis.skipTransactions(transactionsToSkip);
+        }
+        if (elfis.skipUntil(fromTxId) == false) {
+          throw new IOException("failed to advance input stream to txid " +
+              fromTxId);
         }
         return elfis;
       }

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Wed Apr 11 05:47:40 2012
@@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
 
@@ -299,8 +298,7 @@ public class NNStorage extends Storage i
                           NameNodeDirType.IMAGE;
       // Add to the list of storage directories, only if the
       // URI is of type file://
-      if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
-          == 0){
+      if(dirName.getScheme().compareTo("file") == 0) {
         this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
             dirType,
             !sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
@@ -312,8 +310,7 @@ public class NNStorage extends Storage i
       checkSchemeConsistency(dirName);
       // Add to the list of storage directories, only if the
       // URI is of type file://
-      if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
-          == 0)
+      if(dirName.getScheme().compareTo("file") == 0)
         this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
                     NameNodeDirType.EDITS, !sharedEditsDirs.contains(dirName)));
     }

Modified: hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Apr 11 05:47:40 2012
@@ -73,6 +73,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -233,7 +234,7 @@ public class NameNode {
   /** Format a new filesystem.  Destroys any filesystem that may already
    * exist at this location.  **/
   public static void format(Configuration conf) throws IOException {
-    format(conf, true);
+    format(conf, true, true);
   }
 
   static NameNodeMetrics metrics;
@@ -532,6 +533,8 @@ public class NameNode {
    * <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
    * <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster  
    * upgrade and create a snapshot of the current file system state</li> 
+   * <li>{@link StartupOption#RECOVERY RECOVERY} - recover name node
+   * metadata</li>
    * <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the  
    *            cluster back to the previous state</li>
    * <li>{@link StartupOption#FINALIZE FINALIZE} - finalize 
@@ -674,9 +677,8 @@ public class NameNode {
    * @return true if formatting was aborted, false otherwise
    * @throws IOException
    */
-  private static boolean format(Configuration conf,
-                                boolean force)
-      throws IOException {
+  private static boolean format(Configuration conf, boolean force,
+      boolean isInteractive) throws IOException {
     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
     initializeGenericKeys(conf, nsId, namenodeId);
@@ -685,7 +687,7 @@ public class NameNode {
     Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
     List<URI> editDirsToFormat = 
                  FSNamesystem.getNamespaceEditsDirs(conf);
-    if (!confirmFormat(dirsToFormat, force, true)) {
+    if (!confirmFormat(dirsToFormat, force, isInteractive)) {
       return true; // aborted
     }
 
@@ -776,6 +778,9 @@ public class NameNode {
    */
   private static boolean initializeSharedEdits(Configuration conf,
       boolean force, boolean interactive) {
+    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
+    initializeGenericKeys(conf, nsId, namenodeId);
     NNStorage existingStorage = null;
     try {
       FSNamesystem fsns = FSNamesystem.loadFromDisk(conf,
@@ -843,14 +848,17 @@ public class NameNode {
       "Usage: java NameNode [" +
       StartupOption.BACKUP.getName() + "] | [" +
       StartupOption.CHECKPOINT.getName() + "] | [" +
-      StartupOption.FORMAT.getName() + "[" + StartupOption.CLUSTERID.getName() +  
-      " cid ]] | [" +
+      StartupOption.FORMAT.getName() + " [" + StartupOption.CLUSTERID.getName() +  
+      " cid ] [" + StartupOption.FORCE.getName() + "] [" +
+      StartupOption.NONINTERACTIVE.getName() + "] ] | [" +
       StartupOption.UPGRADE.getName() + "] | [" +
       StartupOption.ROLLBACK.getName() + "] | [" +
       StartupOption.FINALIZE.getName() + "] | [" +
       StartupOption.IMPORT.getName() + "] | [" +
-      StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" +
-      StartupOption.INITIALIZESHAREDEDITS.getName() + "]");
+      StartupOption.INITIALIZESHAREDEDITS.getName() + "] | [" +
+      StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" + 
+      StartupOption.RECOVER.getName() + " [ " +
+        StartupOption.FORCE.getName() + " ] ]");
   }
 
   private static StartupOption parseArguments(String args[]) {
@@ -860,11 +868,35 @@ public class NameNode {
       String cmd = args[i];
       if (StartupOption.FORMAT.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.FORMAT;
-        // might be followed by two args
-        if (i + 2 < argsLen
-            && args[i + 1].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
-          i += 2;
-          startOpt.setClusterId(args[i]);
+        for (i = i + 1; i < argsLen; i++) {
+          if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
+            i++;
+            if (i >= argsLen) {
+              // if no cluster id specified, return null
+              LOG.fatal("Must specify a valid cluster ID after the "
+                  + StartupOption.CLUSTERID.getName() + " flag");
+              return null;
+            }
+            String clusterId = args[i];
+            // Make sure an id is specified and not another flag
+            if (clusterId.isEmpty() ||
+                clusterId.equalsIgnoreCase(StartupOption.FORCE.getName()) ||
+                clusterId.equalsIgnoreCase(
+                    StartupOption.NONINTERACTIVE.getName())) {
+              LOG.fatal("Must specify a valid cluster ID after the "
+                  + StartupOption.CLUSTERID.getName() + " flag");
+              return null;
+            }
+            startOpt.setClusterId(clusterId);
+          }
+
+          if (args[i].equalsIgnoreCase(StartupOption.FORCE.getName())) {
+            startOpt.setForceFormat(true);
+          }
+
+          if (args[i].equalsIgnoreCase(StartupOption.NONINTERACTIVE.getName())) {
+            startOpt.setInteractiveFormat(false);
+          }
         }
       } else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.GENCLUSTERID;
@@ -894,6 +926,21 @@ public class NameNode {
       } else if (StartupOption.INITIALIZESHAREDEDITS.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.INITIALIZESHAREDEDITS;
         return startOpt;
+      } else if (StartupOption.RECOVER.getName().equalsIgnoreCase(cmd)) {
+        if (startOpt != StartupOption.REGULAR) {
+          throw new RuntimeException("Can't combine -recover with " +
+              "other startup options.");
+        }
+        startOpt = StartupOption.RECOVER;
+        while (++i < argsLen) {
+          if (args[i].equalsIgnoreCase(
+                StartupOption.FORCE.getName())) {
+            startOpt.setForce(MetaRecoveryContext.FORCE_FIRST_CHOICE);
+          } else {
+            throw new RuntimeException("Error parsing recovery options: " + 
+              "can't understand option \"" + args[i] + "\"");
+          }
+        }
       } else {
         return null;
       }
@@ -910,31 +957,36 @@ public class NameNode {
                                           StartupOption.REGULAR.toString()));
   }
 
-  /**
-   * Print out a prompt to the user, and return true if the user
-   * responds with "Y" or "yes".
-   */
-  static boolean confirmPrompt(String prompt) throws IOException {
-    while (true) {
-      System.err.print(prompt + " (Y or N) ");
-      StringBuilder responseBuilder = new StringBuilder();
-      while (true) {
-        int c = System.in.read();
-        if (c == -1 || c == '\r' || c == '\n') {
-          break;
-        }
-        responseBuilder.append((char)c);
-      }
-  
-      String response = responseBuilder.toString();
-      if (response.equalsIgnoreCase("y") ||
-          response.equalsIgnoreCase("yes")) {
-        return true;
-      } else if (response.equalsIgnoreCase("n") ||
-          response.equalsIgnoreCase("no")) {
-        return false;
+  private static void doRecovery(StartupOption startOpt, Configuration conf)
+      throws IOException {
+    if (startOpt.getForce() < MetaRecoveryContext.FORCE_ALL) {
+      if (!confirmPrompt("You have selected Metadata Recovery mode.  " +
+          "This mode is intended to recover lost metadata on a corrupt " +
+          "filesystem.  Metadata recovery mode often permanently deletes " +
+          "data from your HDFS filesystem.  Please back up your edit log " +
+          "and fsimage before trying this!\n\n" +
+          "Are you ready to proceed? (Y/N)\n")) {
+        System.err.println("Recovery aborted at user request.\n");
+        return;
       }
-      // else ask them again
+    }
+    MetaRecoveryContext.LOG.info("starting recovery...");
+    UserGroupInformation.setConfiguration(conf);
+    NameNode.initMetrics(conf, startOpt.toNodeRole());
+    FSNamesystem fsn = null;
+    try {
+      fsn = FSNamesystem.loadFromDisk(conf);
+      fsn.saveNamespace();
+      MetaRecoveryContext.LOG.info("RECOVERY COMPLETE");
+    } catch (IOException e) {
+      MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
+      throw e;
+    } catch (RuntimeException e) {
+      MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
+      throw e;
+    } finally {
+      if (fsn != null)
+        fsn.close();
     }
   }
 
@@ -959,7 +1011,8 @@ public class NameNode {
 
     switch (startOpt) {
       case FORMAT: {
-        boolean aborted = format(conf, false);
+        boolean aborted = format(conf, startOpt.getForceFormat(),
+            startOpt.getInteractiveFormat());
         System.exit(aborted ? 1 : 0);
         return null; // avoid javac warning
       }
@@ -991,6 +1044,10 @@ public class NameNode {
         DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
         return new BackupNode(conf, role);
       }
+      case RECOVER: {
+        NameNode.doRecovery(startOpt, conf);
+        return null;
+      }
       default:
         DefaultMetricsSystem.initialize("NameNode");
         return new NameNode(conf);



Mime
View raw message