hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1296534 [5/11] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/ src...
Date Sat, 03 Mar 2012 00:43:00 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Sat Mar  3 00:42:49 2012
@@ -133,4 +133,9 @@ class EditLogBackupInputStream extends E
   public long getLastTxId() throws IOException {
     return HdfsConstants.INVALID_TXID;
   }
+
+  @Override
+  public boolean isInProgress() {
+    return true;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Sat Mar  3 00:42:49 2012
@@ -22,12 +22,14 @@ import java.net.InetSocketAddress;
 import java.util.Arrays;
 
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * An implementation of the abstract class {@link EditLogOutputStream},
@@ -40,7 +42,7 @@ import org.apache.hadoop.net.NetUtils;
 class EditLogBackupOutputStream extends EditLogOutputStream {
   static int DEFAULT_BUFFER_SIZE = 256;
 
-  private JournalProtocolTranslatorPB backupNode;  // RPC proxy to backup node
+  private JournalProtocol backupNode;  // RPC proxy to backup node
   private NamenodeRegistration bnRegistration;  // backup node registration
   private NamenodeRegistration nnRegistration;  // active node registration
   private EditsDoubleBuffer doubleBuf;
@@ -55,8 +57,9 @@ class EditLogBackupOutputStream extends 
     InetSocketAddress bnAddress =
       NetUtils.createSocketAddr(bnRegistration.getAddress());
     try {
-      this.backupNode =
-          new JournalProtocolTranslatorPB(bnAddress, new HdfsConfiguration());
+      this.backupNode = NameNodeProxies.createNonHAProxy(new HdfsConfiguration(),
+          bnAddress, JournalProtocol.class, UserGroupInformation.getCurrentUser(),
+          true).getProxy();
     } catch(IOException e) {
       Storage.LOG.error("Error connecting to: " + bnAddress, e);
       throw e;
@@ -93,14 +96,14 @@ class EditLogBackupOutputStream extends 
       throw new IOException("BackupEditStream has " + size +
                           " records still to be flushed and cannot be closed.");
     } 
-    IOUtils.cleanup(Storage.LOG, backupNode); // stop the RPC threads
+    RPC.stopProxy(backupNode); // stop the RPC threads
     doubleBuf.close();
     doubleBuf = null;
   }
 
   @Override
   public void abort() throws IOException {
-    IOUtils.cleanup(Storage.LOG, backupNode);
+    RPC.stopProxy(backupNode);
     doubleBuf = null;
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Sat Mar  3 00:42:49 2012
@@ -41,6 +41,7 @@ class EditLogFileInputStream extends Edi
   private final int logVersion;
   private final FSEditLogOp.Reader reader;
   private final FSEditLogLoader.PositionTrackingInputStream tracker;
+  private final boolean isInProgress;
   
   /**
    * Open an EditLogInputStream for the given file.
@@ -53,7 +54,7 @@ class EditLogFileInputStream extends Edi
    */
   EditLogFileInputStream(File name)
       throws LogHeaderCorruptException, IOException {
-    this(name, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID);
+    this(name, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, false);
   }
 
   /**
@@ -66,7 +67,8 @@ class EditLogFileInputStream extends Edi
    * @throws IOException if an actual IO error occurs while reading the
    *         header
    */
-  EditLogFileInputStream(File name, long firstTxId, long lastTxId)
+  EditLogFileInputStream(File name, long firstTxId, long lastTxId,
+      boolean isInProgress)
       throws LogHeaderCorruptException, IOException {
     file = name;
     fStream = new FileInputStream(name);
@@ -84,6 +86,25 @@ class EditLogFileInputStream extends Edi
     reader = new FSEditLogOp.Reader(in, logVersion);
     this.firstTxId = firstTxId;
     this.lastTxId = lastTxId;
+    this.isInProgress = isInProgress;
+  }
+
+  /**
+   * Skip over a number of transactions. Subsequent calls to
+   * {@link EditLogFileInputStream#readOp()} will begin after these skipped
+   * transactions. If more transactions are requested to be skipped than remain
+   * in the edit log, all edit log ops in the log will be skipped and subsequent
+   * calls to {@link EditLogInputStream#readOp} will return null.
+   * 
+   * @param transactionsToSkip number of transactions to skip over.
+   * @throws IOException if there's an error while reading an operation
+   */
+  public void skipTransactions(long transactionsToSkip) throws IOException {
+    assert firstTxId != HdfsConstants.INVALID_TXID &&
+        lastTxId != HdfsConstants.INVALID_TXID;
+    for (long i = 0; i < transactionsToSkip; i++) {
+      reader.readOp();
+    }
   }
 
   @Override
@@ -133,6 +154,11 @@ class EditLogFileInputStream extends Edi
   }
   
   @Override
+  public boolean isInProgress() {
+    return isInProgress;
+  }
+  
+  @Override
   public String toString() {
     return getName();
   }
@@ -142,11 +168,11 @@ class EditLogFileInputStream extends Edi
     try {
       in = new EditLogFileInputStream(file);
     } catch (LogHeaderCorruptException corrupt) {
-      // If it's missing its header, this is equivalent to no transactions
+      // If the header is malformed or the wrong value, this indicates a corruption
       FSImage.LOG.warn("Log at " + file + " has no valid header",
           corrupt);
-      return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID, 
-                                                   HdfsConstants.INVALID_TXID);
+      return new FSEditLogLoader.EditLogValidation(0,
+          HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, true);
     }
     
     try {
@@ -172,14 +198,13 @@ class EditLogFileInputStream extends Edi
       throw new LogHeaderCorruptException(
           "Reached EOF when reading log header");
     }
-    if (logVersion < HdfsConstants.LAYOUT_VERSION) { // future version
+    if (logVersion < HdfsConstants.LAYOUT_VERSION || // future version
+        logVersion > Storage.LAST_UPGRADABLE_LAYOUT_VERSION) { // unsupported
       throw new LogHeaderCorruptException(
           "Unexpected version of the file system log file: "
           + logVersion + ". Current version = "
           + HdfsConstants.LAYOUT_VERSION + ".");
     }
-    assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
-      "Unsupported version " + logVersion;
     return logVersion;
   }
   

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Sat Mar  3 00:42:49 2012
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -27,6 +28,7 @@ import java.nio.channels.FileChannel;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.io.IOUtils;
 
@@ -36,7 +38,8 @@ import com.google.common.annotations.Vis
  * An implementation of the abstract class {@link EditLogOutputStream}, which
  * stores edits in a local file.
  */
-class EditLogFileOutputStream extends EditLogOutputStream {
+@InterfaceAudience.Private
+public class EditLogFileOutputStream extends EditLogOutputStream {
   private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
 
   private File file;
@@ -96,11 +99,23 @@ class EditLogFileOutputStream extends Ed
   public void create() throws IOException {
     fc.truncate(0);
     fc.position(0);
-    doubleBuf.getCurrentBuf().writeInt(HdfsConstants.LAYOUT_VERSION);
+    writeHeader(doubleBuf.getCurrentBuf());
     setReadyToFlush();
     flush();
   }
 
+  /**
+   * Write header information for this EditLogFileOutputStream to the provided
+   * DataOutputSream.
+   * 
+   * @param out the output stream to write the header to.
+   * @throws IOException in the event of error writing to the stream.
+   */
+  @VisibleForTesting
+  public static void writeHeader(DataOutputStream out) throws IOException {
+    out.writeInt(HdfsConstants.LAYOUT_VERSION);
+  }
+
   @Override
   public void close() throws IOException {
     if (fp == null) {
@@ -204,6 +219,11 @@ class EditLogFileOutputStream extends Ed
   File getFile() {
     return file;
   }
+  
+  @Override
+  public String toString() {
+    return "EditLogFileOutputStream(" + file + ")";
+  }
 
   /**
    * @return true if this stream is currently open.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Sat Mar  3 00:42:49 2012
@@ -22,6 +22,9 @@ import org.apache.hadoop.classification.
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * A generic abstract class to support reading edits log data from 
  * persistent storage.
@@ -79,4 +82,9 @@ public abstract class EditLogInputStream
    * Return the size of the current edits log.
    */
   public abstract long length() throws IOException;
+  
+  /**
+   * Return true if this stream is in progress, false if it is finalized.
+   */
+  public abstract boolean isInProgress();
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Sat Mar  3 00:42:49 2012
@@ -261,113 +261,32 @@ public class FSDirectory implements Clos
    */
   INode unprotectedAddFile( String path, 
                             PermissionStatus permissions,
-                            BlockInfo[] blocks, 
                             short replication,
                             long modificationTime,
                             long atime,
                             long preferredBlockSize,
+                            boolean underConstruction,
                             String clientName,
                             String clientMachine)
       throws UnresolvedLinkException {
     INode newNode;
     assert hasWriteLock();
-    if (blocks == null)
-      newNode = new INodeDirectory(permissions, modificationTime);
-    else if(blocks.length == 0 || blocks[blocks.length-1].getBlockUCState()
-        == BlockUCState.UNDER_CONSTRUCTION) {
+    if (underConstruction) {
       newNode = new INodeFileUnderConstruction(
-          permissions, blocks.length, replication,
+          permissions, replication,
           preferredBlockSize, modificationTime, clientName, 
           clientMachine, null);
     } else {
-      newNode = new INodeFile(permissions, blocks.length, replication,
+      newNode = new INodeFile(permissions, 0, replication,
                               modificationTime, atime, preferredBlockSize);
     }
-    writeLock();
-    try {
-      try {
-        newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
-        if(newNode != null && blocks != null) {
-          int nrBlocks = blocks.length;
-          // Add file->block mapping
-          INodeFile newF = (INodeFile)newNode;
-          for (int i = 0; i < nrBlocks; i++) {
-            newF.setBlock(i, getBlockManager().addINode(blocks[i], newF));
-          }
-        }
-      } catch (IOException e) {
-        return null;
-      }
-      return newNode;
-    } finally {
-      writeUnlock();
-    }
 
-  }
-
-  /**
-   * Update files in-memory data structures with new block information.
-   * @throws IOException 
-   */
-  void updateFile(INodeFile file,
-                  String path,
-                  BlockInfo[] blocks, 
-                  long mtime,
-                  long atime) throws IOException {
-
-    // Update the salient file attributes.
-    file.setAccessTime(atime);
-    file.setModificationTimeForce(mtime);
-
-    // Update its block list
-    BlockInfo[] oldBlocks = file.getBlocks();
-
-    // Are we only updating the last block's gen stamp.
-    boolean isGenStampUpdate = oldBlocks.length == blocks.length;
-
-    // First, update blocks in common
-    BlockInfo oldBlock = null;
-    for (int i = 0; i < oldBlocks.length && i < blocks.length; i++) {
-      oldBlock = oldBlocks[i];
-      Block newBlock = blocks[i];
-
-      boolean isLastBlock = i == oldBlocks.length - 1;
-      if (oldBlock.getBlockId() != newBlock.getBlockId() ||
-          (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() && 
-              !(isGenStampUpdate && isLastBlock))) {
-        throw new IOException("Mismatched block IDs or generation stamps, " + 
-            "attempting to replace block " + oldBlock + " with " + newBlock +
-            " as block # " + i + "/" + blocks.length + " of " + path);
-      }
-
-      oldBlock.setNumBytes(newBlock.getNumBytes());
-      oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
-    }
-
-    if (blocks.length < oldBlocks.length) {
-      // We're removing a block from the file, e.g. abandonBlock(...)
-      if (!file.isUnderConstruction()) {
-        throw new IOException("Trying to remove a block from file " +
-            path + " which is not under construction.");
-      }
-      if (blocks.length != oldBlocks.length - 1) {
-        throw new IOException("Trying to remove more than one block from file "
-            + path);
-      }
-      unprotectedRemoveBlock(path,
-          (INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]);
-    } else if (blocks.length > oldBlocks.length) {
-      // We're adding blocks
-      // First complete last old Block
-      getBlockManager().completeBlock(file, oldBlocks.length-1, true);
-      // Add the new blocks
-      for (int i = oldBlocks.length; i < blocks.length; i++) {
-        // addBlock();
-        BlockInfo newBI = blocks[i];
-        getBlockManager().addINode(newBI, file);
-        file.addBlock(newBI);
-      }
+    try {
+      newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
+    } catch (IOException e) {
+      return null;
     }
+    return newNode;
   }
 
   INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
@@ -450,7 +369,7 @@ public class FSDirectory implements Clos
 
     writeLock();
     try {
-      fsImage.getEditLog().logOpenFile(path, file);
+      fsImage.getEditLog().logUpdateBlocks(path, file);
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
             +path+" with "+ file.getBlocks().length 
@@ -460,7 +379,7 @@ public class FSDirectory implements Clos
       writeUnlock();
     }
   }
-
+  
   /**
    * Close file.
    */
@@ -483,7 +402,7 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Remove a block to the file.
+   * Remove a block from the file.
    */
   boolean removeBlock(String path, INodeFileUnderConstruction fileNode, 
                       Block block) throws IOException {
@@ -499,7 +418,7 @@ public class FSDirectory implements Clos
     }
     return true;
   }
-
+  
   void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode, 
       Block block) throws IOException {
     // modify file-> block and blocksMap

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Sat Mar  3 00:42:49 2012
@@ -62,22 +62,36 @@ public class FSEditLog  {
 
   /**
    * State machine for edit log.
+   * 
+   * In a non-HA setup:
+   * 
    * The log starts in UNITIALIZED state upon construction. Once it's
-   * initialized, it is usually in IN_SEGMENT state, indicating that edits
-   * may be written. In the middle of a roll, or while saving the namespace,
-   * it briefly enters the BETWEEN_LOG_SEGMENTS state, indicating that the
-   * previous segment has been closed, but the new one has not yet been opened.
+   * initialized, it is usually in IN_SEGMENT state, indicating that edits may
+   * be written. In the middle of a roll, or while saving the namespace, it
+   * briefly enters the BETWEEN_LOG_SEGMENTS state, indicating that the previous
+   * segment has been closed, but the new one has not yet been opened.
+   * 
+   * In an HA setup:
+   * 
+   * The log starts in UNINITIALIZED state upon construction. Once it's
+   * initialized, it sits in the OPEN_FOR_READING state the entire time that the
+   * NN is in standby. Upon the NN transition to active, the log will be CLOSED,
+   * and then move to being BETWEEN_LOG_SEGMENTS, much as if the NN had just
+   * started up, and then will move to IN_SEGMENT so it can begin writing to the
+   * log. The log states will then revert to behaving as they do in a non-HA
+   * setup.
    */
   private enum State {
     UNINITIALIZED,
     BETWEEN_LOG_SEGMENTS,
     IN_SEGMENT,
+    OPEN_FOR_READING,
     CLOSED;
   }  
   private State state = State.UNINITIALIZED;
   
   //initialize
-  private JournalSet journalSet;
+  private JournalSet journalSet = null;
   private EditLogOutputStream editLogStream = null;
 
   // a monotonically increasing counter that represents transactionIds.
@@ -112,7 +126,12 @@ public class FSEditLog  {
   private NNStorage storage;
   private Configuration conf;
   
-  private Collection<URI> editsDirs;
+  private List<URI> editsDirs;
+  
+  /**
+   * The edit directories that are shared between primary and secondary.
+   */
+  private List<URI> sharedEditsDirs;
 
   private static class TransactionId {
     public long txid;
@@ -151,11 +170,11 @@ public class FSEditLog  {
    * @param storage Storage object used by namenode
    * @param editsDirs List of journals to use
    */
-  FSEditLog(Configuration conf, NNStorage storage, Collection<URI> editsDirs) {
+  FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) {
     init(conf, storage, editsDirs);
   }
   
-  private void init(Configuration conf, NNStorage storage, Collection<URI> editsDirs) {
+  private void init(Configuration conf, NNStorage storage, List<URI> editsDirs) {
     isSyncRunning = false;
     this.conf = conf;
     this.storage = storage;
@@ -165,19 +184,44 @@ public class FSEditLog  {
     // If this list is empty, an error will be thrown on first use
     // of the editlog, as no journals will exist
     this.editsDirs = Lists.newArrayList(editsDirs);
+
+    this.sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
+  }
+  
+  public synchronized void initJournalsForWrite() {
+    Preconditions.checkState(state == State.UNINITIALIZED ||
+        state == State.CLOSED, "Unexpected state: %s", state);
     
+    initJournals(this.editsDirs);
+    state = State.BETWEEN_LOG_SEGMENTS;
+  }
+  
+  public synchronized void initSharedJournalsForRead() {
+    if (state == State.OPEN_FOR_READING) {
+      LOG.warn("Initializing shared journals for READ, already open for READ",
+          new Exception());
+      return;
+    }
+    Preconditions.checkState(state == State.UNINITIALIZED ||
+        state == State.CLOSED);
+    
+    initJournals(this.sharedEditsDirs);
+    state = State.OPEN_FOR_READING;
+  }
+  
+  private synchronized void initJournals(List<URI> dirs) {
     int minimumRedundantJournals = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
         DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
 
     journalSet = new JournalSet(minimumRedundantJournals);
-    for (URI u : this.editsDirs) {
+    for (URI u : dirs) {
       boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
           .contains(u);
       if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
         StorageDirectory sd = storage.getStorageDirectory(u);
         if (sd != null) {
-          journalSet.add(new FileJournalManager(sd), required);
+          journalSet.add(new FileJournalManager(sd, storage), required);
         }
       } else {
         journalSet.add(createJournal(u), required);
@@ -187,7 +231,6 @@ public class FSEditLog  {
     if (journalSet.isEmpty()) {
       LOG.error("No edits directories configured!");
     } 
-    state = State.BETWEEN_LOG_SEGMENTS;
   }
 
   /**
@@ -202,18 +245,51 @@ public class FSEditLog  {
    * Initialize the output stream for logging, opening the first
    * log segment.
    */
-  synchronized void open() throws IOException {
-    Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS);
+  synchronized void openForWrite() throws IOException {
+    Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
+        "Bad state: %s", state);
 
-    startLogSegment(getLastWrittenTxId() + 1, true);
+    long segmentTxId = getLastWrittenTxId() + 1;
+    // Safety check: we should never start a segment if there are
+    // newer txids readable.
+    EditLogInputStream s = journalSet.getInputStream(segmentTxId, true);
+    try {
+      Preconditions.checkState(s == null,
+          "Cannot start writing at txid %s when there is a stream " +
+          "available for read: %s", segmentTxId, s);
+    } finally {
+      IOUtils.closeStream(s);
+    }
+    
+    startLogSegment(segmentTxId, true);
     assert state == State.IN_SEGMENT : "Bad state: " + state;
   }
   
-  synchronized boolean isOpen() {
+  /**
+   * @return true if the log is currently open in write mode, regardless
+   * of whether it actually has an open segment.
+   */
+  synchronized boolean isOpenForWrite() {
+    return state == State.IN_SEGMENT ||
+      state == State.BETWEEN_LOG_SEGMENTS;
+  }
+  
+  /**
+   * @return true if the log is open in write mode and has a segment open
+   * ready to take edits.
+   */
+  synchronized boolean isSegmentOpen() {
     return state == State.IN_SEGMENT;
   }
 
   /**
+   * @return true if the log is open in read mode.
+   */
+  public synchronized boolean isOpenForRead() {
+    return state == State.OPEN_FOR_READING;
+  }
+
+  /**
    * Shutdown the file store.
    */
   synchronized void close() {
@@ -242,7 +318,8 @@ public class FSEditLog  {
    */
   void logEdit(final FSEditLogOp op) {
     synchronized (this) {
-      assert state != State.CLOSED;
+      assert isOpenForWrite() :
+        "bad state: " + state;
       
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
@@ -329,7 +406,7 @@ public class FSEditLog  {
   /**
    * Return the transaction ID of the last transaction written to the log.
    */
-  synchronized long getLastWrittenTxId() {
+  public synchronized long getLastWrittenTxId() {
     return txid;
   }
   
@@ -337,7 +414,7 @@ public class FSEditLog  {
    * @return the first transaction ID in the current log segment
    */
   synchronized long getCurSegmentTxId() {
-    Preconditions.checkState(state == State.IN_SEGMENT,
+    Preconditions.checkState(isSegmentOpen(),
         "Bad state: %s", state);
     return curSegmentTxId;
   }
@@ -549,6 +626,13 @@ public class FSEditLog  {
     logEdit(op);
   }
   
+  public void logUpdateBlocks(String path, INodeFileUnderConstruction file) {
+    UpdateBlocksOp op = UpdateBlocksOp.getInstance()
+      .setPath(path)
+      .setBlocks(file.getBlocks());
+    logEdit(op);
+  }
+  
   /** 
    * Add create directory record to edit log
    */
@@ -724,16 +808,25 @@ public class FSEditLog  {
    * Used only by unit tests.
    */
   @VisibleForTesting
-  List<JournalAndStream> getJournals() {
+  synchronized List<JournalAndStream> getJournals() {
     return journalSet.getAllJournalStreams();
   }
   
   /**
+   * Used only by tests.
+   */
+  @VisibleForTesting
+  synchronized public JournalSet getJournalSet() {
+    return journalSet;
+  }
+  
+  /**
    * Used only by unit tests.
    */
   @VisibleForTesting
   synchronized void setRuntimeForTesting(Runtime runtime) {
     this.runtime = runtime;
+    this.journalSet.setRuntimeForTesting(runtime);
   }
 
   /**
@@ -796,7 +889,7 @@ public class FSEditLog  {
       editLogStream = journalSet.startLogSegment(segmentTxId);
     } catch (IOException ex) {
       throw new IOException("Unable to start log segment " +
-          segmentTxId + ": no journals successfully started.");
+          segmentTxId + ": too few journals successfully started.", ex);
     }
     
     curSegmentTxId = segmentTxId;
@@ -815,7 +908,7 @@ public class FSEditLog  {
    */
   synchronized void endCurrentLogSegment(boolean writeEndTxn) {
     LOG.info("Ending log segment " + curSegmentTxId);
-    Preconditions.checkState(state == State.IN_SEGMENT,
+    Preconditions.checkState(isSegmentOpen(),
         "Bad state: %s", state);
     
     if (writeEndTxn) {
@@ -847,6 +940,7 @@ public class FSEditLog  {
       if (editLogStream != null) {
         editLogStream.abort();
         editLogStream = null;
+        state = State.BETWEEN_LOG_SEGMENTS;
       }
     } catch (IOException e) {
       LOG.warn("All journals failed to abort", e);
@@ -856,17 +950,14 @@ public class FSEditLog  {
   /**
    * Archive any log files that are older than the given txid.
    */
-  public void purgeLogsOlderThan(final long minTxIdToKeep) {
-    synchronized (this) {
-      // synchronized to prevent findbugs warning about inconsistent
-      // synchronization. This will be JIT-ed out if asserts are
-      // off.
-      assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op
-        minTxIdToKeep <= curSegmentTxId :
-        "cannot purge logs older than txid " + minTxIdToKeep +
-        " when current segment starts at " + curSegmentTxId;
-    }
+  public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) {
+    assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op
+      minTxIdToKeep <= curSegmentTxId :
+      "cannot purge logs older than txid " + minTxIdToKeep +
+      " when current segment starts at " + curSegmentTxId;
 
+    // This could be improved to not need synchronization. But currently,
+    // journalSet is not threadsafe, so we need to synchronize this method.
     try {
       journalSet.purgeLogsOlderThan(minTxIdToKeep);
     } catch (IOException ex) {
@@ -898,8 +989,8 @@ public class FSEditLog  {
 
 
   // sets the initial capacity of the flush buffer.
-  public void setOutputBufferCapacity(int size) {
-      journalSet.setOutputBufferCapacity(size);
+  synchronized void setOutputBufferCapacity(int size) {
+    journalSet.setOutputBufferCapacity(size);
   }
 
   /**
@@ -975,32 +1066,45 @@ public class FSEditLog  {
   /**
    * Run recovery on all journals to recover any unclosed segments
    */
-  void recoverUnclosedStreams() {
+  synchronized void recoverUnclosedStreams() {
+    Preconditions.checkState(
+        state == State.BETWEEN_LOG_SEGMENTS,
+        "May not recover segments - wrong state: %s", state);
     try {
       journalSet.recoverUnfinalizedSegments();
     } catch (IOException ex) {
       // All journals have failed, it is handled in logSync.
     }
   }
+  
+  Collection<EditLogInputStream> selectInputStreams(long fromTxId,
+      long toAtLeastTxId) throws IOException {
+    return selectInputStreams(fromTxId, toAtLeastTxId, true);
+  }
 
   /**
    * Select a list of input streams to load.
+   * 
    * @param fromTxId first transaction in the selected streams
    * @param toAtLeast the selected streams must contain this transaction
+   * @param inProgessOk set to true if in-progress streams are OK
    */
-  Collection<EditLogInputStream> selectInputStreams(long fromTxId,
-      long toAtLeastTxId) throws IOException {
+  public synchronized Collection<EditLogInputStream> selectInputStreams(long fromTxId,
+      long toAtLeastTxId, boolean inProgressOk) throws IOException {
     List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
-    EditLogInputStream stream = journalSet.getInputStream(fromTxId);
+    EditLogInputStream stream = journalSet.getInputStream(fromTxId, inProgressOk);
     while (stream != null) {
-      fromTxId = stream.getLastTxId() + 1;
       streams.add(stream);
-      stream = journalSet.getInputStream(fromTxId);
+      // We're now looking for a higher range, so reset the fromTxId
+      fromTxId = stream.getLastTxId() + 1;
+      stream = journalSet.getInputStream(fromTxId, inProgressOk);
     }
+    
     if (fromTxId <= toAtLeastTxId) {
       closeAllStreams(streams);
-      throw new IOException("No non-corrupt logs for txid " 
-                            + fromTxId);
+      throw new IOException(String.format("Gap in transactions. Expected to "
+          + "be able to read up until at least txid %d but unable to find any "
+          + "edit logs containing txid %d", toAtLeastTxId, fromTxId));
     }
     return streams;
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Sat Mar  3 00:42:49 2012
@@ -28,6 +28,7 @@ import java.util.EnumMap;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp;
@@ -54,9 +56,12 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.hadoop.io.IOUtils;
+
 import com.google.common.base.Joiner;
 
 @InterfaceAudience.Private
@@ -73,40 +78,32 @@ public class FSEditLogLoader {
    * This is where we apply edits that we've been writing to disk all
    * along.
    */
-  int loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
-  throws IOException {
-    long startTime = now();
-    int numEdits = loadFSEdits(edits, true, expectedStartingTxId);
-    FSImage.LOG.info("Edits file " + edits.getName() 
-        + " of size " + edits.length() + " edits # " + numEdits 
-        + " loaded in " + (now()-startTime)/1000 + " seconds.");
-    return numEdits;
-  }
-
-  int loadFSEdits(EditLogInputStream edits, boolean closeOnExit,
-                  long expectedStartingTxId)
+  long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
       throws IOException {
-    int numEdits = 0;
+    long numEdits = 0;
     int logVersion = edits.getVersion();
 
+    fsNamesys.writeLock();
     try {
+      long startTime = now();
       numEdits = loadEditRecords(logVersion, edits, false, 
                                  expectedStartingTxId);
+      FSImage.LOG.info("Edits file " + edits.getName() 
+          + " of size " + edits.length() + " edits # " + numEdits 
+          + " loaded in " + (now()-startTime)/1000 + " seconds.");
     } finally {
-      if(closeOnExit) {
-        edits.close();
-      }
+      edits.close();
+      fsNamesys.writeUnlock();
     }
     
     return numEdits;
   }
 
-  @SuppressWarnings("deprecation")
-  int loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
+  long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
                       long expectedStartingTxId)
-      throws IOException {
+      throws IOException, EditLogInputException {
     FSDirectory fsDir = fsNamesys.dir;
-    int numEdits = 0;
+    long numEdits = 0;
 
     EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
       new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
@@ -120,9 +117,20 @@ public class FSEditLogLoader {
     long txId = expectedStartingTxId - 1;
     try {
       try {
-        FSEditLogOp op;
-        while ((op = in.readOp()) != null) {
-          recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
+        while (true) {
+          FSEditLogOp op;
+          try {
+            if ((op = in.readOp()) == null) {
+              break;
+            }
+          } catch (IOException ioe) {
+            long badTxId = txId + 1; // because txId hasn't been incremented yet
+            String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, badTxId);
+            FSImage.LOG.error(errorMessage);
+            throw new EditLogInputException(errorMessage,
+                ioe, numEdits);
+          }
+          recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
             in.getPosition();
           if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
             long expectedTxId = txId + 1;
@@ -133,310 +141,442 @@ public class FSEditLogLoader {
             }
           }
 
-          numEdits++;
           incrOpCount(op.opCode, opCounts);
-          switch (op.opCode) {
-          case OP_ADD:
-          case OP_CLOSE: {
-            AddCloseOp addCloseOp = (AddCloseOp)op;
-
-            // versions > 0 support per file replication
-            // get name and replication
-            final short replication  = fsNamesys.getBlockManager(
-                ).adjustReplication(addCloseOp.replication);
-
-            long blockSize = addCloseOp.blockSize;
-            BlockInfo blocks[] = new BlockInfo[addCloseOp.blocks.length];
-            for (int i = 0; i < addCloseOp.blocks.length; i++) {
-              if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD
-                 && i == addCloseOp.blocks.length-1) {
-                blocks[i] = new BlockInfoUnderConstruction(addCloseOp.blocks[i],
-                                                           replication);
-              } else {
-                blocks[i] = new BlockInfo(addCloseOp.blocks[i], replication);
-              }
-            }
-
-            PermissionStatus permissions = fsNamesys.getUpgradePermission();
-            if (addCloseOp.permissions != null) {
-              permissions = addCloseOp.permissions;
-            }
-
-
-            // Older versions of HDFS does not store the block size in inode.
-            // If the file has more than one block, use the size of the
-            // first block as the blocksize. Otherwise use the default
-            // block size.
-            if (-8 <= logVersion && blockSize == 0) {
-              if (blocks.length > 1) {
-                blockSize = blocks[0].getNumBytes();
-              } else {
-                long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
-                blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
-              }
-            }
-
-
-            // The open lease transaction re-creates a file if necessary.
-            // Delete the file if it already exists.
-            if (FSNamesystem.LOG.isDebugEnabled()) {
-              FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
-                  " numblocks : " + blocks.length +
-                  " clientHolder " + addCloseOp.clientName +
-                  " clientMachine " + addCloseOp.clientMachine);
-            }
-
-            // There are four cases here:
-            // 1. OP_ADD to create a new file
-            // 2. OP_ADD to update file blocks
-            // 3. OP_ADD to open file for append
-            // 4. OP_CLOSE to close the file
-
-            // See if the file already exists
-            INodeFile oldFile = fsDir.getFileINode(addCloseOp.path);
-            if (oldFile == null) { // OP_ADD for a new file
-              assert addCloseOp.opCode == FSEditLogOpCodes.OP_ADD : 
-                "Expected opcode OP_ADD, but got " + addCloseOp.opCode;
-              fsDir.unprotectedAddFile(
-                  addCloseOp.path, permissions, blocks, replication,
-                  addCloseOp.mtime, addCloseOp.atime, blockSize,
-                  addCloseOp.clientName, addCloseOp.clientMachine);
-            } else {
-              fsDir.updateFile(oldFile, addCloseOp.path, blocks,
-                  addCloseOp.mtime, addCloseOp.atime);
-              if(addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE) {  // OP_CLOSE
-                if (!oldFile.isUnderConstruction() &&
-                    logVersion <= LayoutVersion.BUGFIX_HDFS_2991_VERSION) {
-                  // There was a bug (HDFS-2991) in hadoop < 0.23.1 where OP_CLOSE
-                  // could show up twice in a row. But after that version, this
-                  // should be fixed, so we should treat it as an error.
-                  throw new IOException(
-                      "File is not under construction: " + addCloseOp.path);
-                }
-                fsNamesys.getBlockManager().completeBlock(
-                    oldFile, blocks.length-1, true);
-                
-                if (oldFile.isUnderConstruction()) {
-                  INodeFile newFile =
-                    ((INodeFileUnderConstruction)oldFile).convertToInodeFile();
-                  fsDir.replaceNode(addCloseOp.path, oldFile, newFile);
-                }
-              } else if(! oldFile.isUnderConstruction()) {  // OP_ADD for append
-                INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                    oldFile.getLocalNameBytes(),
-                    oldFile.getReplication(), 
-                    oldFile.getModificationTime(),
-                    oldFile.getPreferredBlockSize(),
-                    oldFile.getBlocks(),
-                    oldFile.getPermissionStatus(),
-                    addCloseOp.clientName,
-                    addCloseOp.clientMachine,
-                    null);
-                fsDir.replaceNode(addCloseOp.path, oldFile, cons);
-              }
-            }
-            // Update file lease
-            if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
-              fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
-            } else {  // Ops.OP_CLOSE
-              if (oldFile.isUnderConstruction()) {
-                fsNamesys.leaseManager.removeLease(
-                    ((INodeFileUnderConstruction)oldFile).getClientName(), addCloseOp.path);
-              }
-            }
-            break;
-          }
-          case OP_SET_REPLICATION: {
-            SetReplicationOp setReplicationOp = (SetReplicationOp)op;
-            short replication = fsNamesys.getBlockManager().adjustReplication(
-                setReplicationOp.replication);
-            fsDir.unprotectedSetReplication(setReplicationOp.path,
-                                            replication, null);
-            break;
-          }
-          case OP_CONCAT_DELETE: {
-            ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
-            fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
-                concatDeleteOp.timestamp);
-            break;
-          }
-          case OP_RENAME_OLD: {
-            RenameOldOp renameOp = (RenameOldOp)op;
-            HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
-            fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
-                                      renameOp.timestamp);
-            fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
-            break;
-          }
-          case OP_DELETE: {
-            DeleteOp deleteOp = (DeleteOp)op;
-            fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
-            break;
-          }
-          case OP_MKDIR: {
-            MkdirOp mkdirOp = (MkdirOp)op;
-            PermissionStatus permissions = fsNamesys.getUpgradePermission();
-            if (mkdirOp.permissions != null) {
-              permissions = mkdirOp.permissions;
-            }
-
-            fsDir.unprotectedMkdir(mkdirOp.path, permissions,
-                                   mkdirOp.timestamp);
-            break;
-          }
-          case OP_SET_GENSTAMP: {
-            SetGenstampOp setGenstampOp = (SetGenstampOp)op;
-            fsNamesys.setGenerationStamp(setGenstampOp.genStamp);
-            break;
-          }
-          case OP_SET_PERMISSIONS: {
-            SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
-            fsDir.unprotectedSetPermission(setPermissionsOp.src,
-                                           setPermissionsOp.permissions);
-            break;
-          }
-          case OP_SET_OWNER: {
-            SetOwnerOp setOwnerOp = (SetOwnerOp)op;
-            fsDir.unprotectedSetOwner(setOwnerOp.src, setOwnerOp.username,
-                                      setOwnerOp.groupname);
-            break;
-          }
-          case OP_SET_NS_QUOTA: {
-            SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
-            fsDir.unprotectedSetQuota(setNSQuotaOp.src,
-                                      setNSQuotaOp.nsQuota,
-                                      HdfsConstants.QUOTA_DONT_SET);
-            break;
-          }
-          case OP_CLEAR_NS_QUOTA: {
-            ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
-            fsDir.unprotectedSetQuota(clearNSQuotaOp.src,
-                                      HdfsConstants.QUOTA_RESET,
-                                      HdfsConstants.QUOTA_DONT_SET);
-            break;
-          }
-
-          case OP_SET_QUOTA:
-            SetQuotaOp setQuotaOp = (SetQuotaOp)op;
-            fsDir.unprotectedSetQuota(setQuotaOp.src,
-                                      setQuotaOp.nsQuota,
-                                      setQuotaOp.dsQuota);
-            break;
-
-          case OP_TIMES: {
-            TimesOp timesOp = (TimesOp)op;
-
-            fsDir.unprotectedSetTimes(timesOp.path,
-                                      timesOp.mtime,
-                                      timesOp.atime, true);
-            break;
-          }
-          case OP_SYMLINK: {
-            SymlinkOp symlinkOp = (SymlinkOp)op;
-            fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value,
-                                     symlinkOp.mtime, symlinkOp.atime,
-                                     symlinkOp.permissionStatus);
-            break;
-          }
-          case OP_RENAME: {
-            RenameOp renameOp = (RenameOp)op;
-
-            HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
-            fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
-                                      renameOp.timestamp, renameOp.options);
-            fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
-            break;
-          }
-          case OP_GET_DELEGATION_TOKEN: {
-            GetDelegationTokenOp getDelegationTokenOp
-              = (GetDelegationTokenOp)op;
-
-            fsNamesys.getDelegationTokenSecretManager()
-              .addPersistedDelegationToken(getDelegationTokenOp.token,
-                                           getDelegationTokenOp.expiryTime);
-            break;
-          }
-          case OP_RENEW_DELEGATION_TOKEN: {
-            RenewDelegationTokenOp renewDelegationTokenOp
-              = (RenewDelegationTokenOp)op;
-            fsNamesys.getDelegationTokenSecretManager()
-              .updatePersistedTokenRenewal(renewDelegationTokenOp.token,
-                                           renewDelegationTokenOp.expiryTime);
-            break;
-          }
-          case OP_CANCEL_DELEGATION_TOKEN: {
-            CancelDelegationTokenOp cancelDelegationTokenOp
-              = (CancelDelegationTokenOp)op;
-            fsNamesys.getDelegationTokenSecretManager()
-                .updatePersistedTokenCancellation(
-                    cancelDelegationTokenOp.token);
-            break;
-          }
-          case OP_UPDATE_MASTER_KEY: {
-            UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op;
-            fsNamesys.getDelegationTokenSecretManager()
-              .updatePersistedMasterKey(updateMasterKeyOp.key);
-            break;
-          }
-          case OP_REASSIGN_LEASE: {
-            ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op;
-
-            Lease lease = fsNamesys.leaseManager.getLease(
-                reassignLeaseOp.leaseHolder);
-            INodeFileUnderConstruction pendingFile =
-                (INodeFileUnderConstruction) fsDir.getFileINode(
-                    reassignLeaseOp.path);
-            fsNamesys.reassignLeaseInternal(lease,
-                reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
-            break;
-          }
-          case OP_START_LOG_SEGMENT:
-          case OP_END_LOG_SEGMENT: {
-            // no data in here currently.
-            break;
-          }
-          case OP_DATANODE_ADD:
-          case OP_DATANODE_REMOVE:
-            break;
-          default:
-            throw new IOException("Invalid operation read " + op.opCode);
+          try {
+            applyEditLogOp(op, fsDir, logVersion);
+          } catch (Throwable t) {
+            // Catch Throwable because in the case of a truly corrupt edits log, any
+            // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.)
+            String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, txId);
+            FSImage.LOG.error(errorMessage);
+            throw new IOException(errorMessage, t);
           }
+          numEdits++;
         }
-
       } catch (IOException ex) {
         check203UpgradeFailure(logVersion, ex);
       } finally {
         if(closeOnExit)
           in.close();
       }
-    } catch (Throwable t) {
-      // Catch Throwable because in the case of a truly corrupt edits log, any
-      // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.)
-      StringBuilder sb = new StringBuilder();
-      sb.append("Error replaying edit log at offset " + in.getPosition());
-      sb.append("On transaction ID ").append(txId);
-      if (recentOpcodeOffsets[0] != -1) {
-        Arrays.sort(recentOpcodeOffsets);
-        sb.append("\nRecent opcode offsets:");
-        for (long offset : recentOpcodeOffsets) {
-          if (offset != -1) {
-            sb.append(' ').append(offset);
-          }
-        }
-      }
-      String errorMessage = sb.toString();
-      FSImage.LOG.error(errorMessage);
-      throw new IOException(errorMessage, t);
     } finally {
       fsDir.writeUnlock();
       fsNamesys.writeUnlock();
-    }
-    if (FSImage.LOG.isDebugEnabled()) {
-      dumpOpCounts(opCounts);
+      if (FSImage.LOG.isDebugEnabled()) {
+        dumpOpCounts(opCounts);
+      }
     }
     return numEdits;
   }
+  
+  @SuppressWarnings("deprecation")
+  private void applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
+      int logVersion) throws IOException {
+    switch (op.opCode) {
+    case OP_ADD: {
+      AddCloseOp addCloseOp = (AddCloseOp)op;
+      if (FSNamesystem.LOG.isDebugEnabled()) {
+        FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+            " numblocks : " + addCloseOp.blocks.length +
+            " clientHolder " + addCloseOp.clientName +
+            " clientMachine " + addCloseOp.clientMachine);
+      }
+      // There three cases here:
+      // 1. OP_ADD to create a new file
+      // 2. OP_ADD to update file blocks
+      // 3. OP_ADD to open file for append
+
+      // See if the file already exists (persistBlocks call)
+      INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
+      INodeFile newFile = oldFile;
+      if (oldFile == null) { // this is OP_ADD on a new file (case 1)
+        // versions > 0 support per file replication
+        // get name and replication
+        final short replication  = fsNamesys.getBlockManager(
+            ).adjustReplication(addCloseOp.replication);
+        PermissionStatus permissions = fsNamesys.getUpgradePermission();
+        if (addCloseOp.permissions != null) {
+          permissions = addCloseOp.permissions;
+        }
+        long blockSize = addCloseOp.blockSize;
 
+        // Versions of HDFS prior to 0.17 may log an OP_ADD transaction
+        // which includes blocks in it. When we update the minimum
+        // upgrade version to something more recent than 0.17, we can
+        // simplify this code by asserting that OP_ADD transactions
+        // don't have any blocks.
+        
+        // Older versions of HDFS does not store the block size in inode.
+        // If the file has more than one block, use the size of the
+        // first block as the blocksize. Otherwise use the default
+        // block size.
+        if (-8 <= logVersion && blockSize == 0) {
+          if (addCloseOp.blocks.length > 1) {
+            blockSize = addCloseOp.blocks[0].getNumBytes();
+          } else {
+            long first = ((addCloseOp.blocks.length == 1)?
+                addCloseOp.blocks[0].getNumBytes(): 0);
+            blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
+          }
+        }
+
+        // add to the file tree
+        newFile = (INodeFile)fsDir.unprotectedAddFile(
+            addCloseOp.path, permissions,
+            replication, addCloseOp.mtime,
+            addCloseOp.atime, blockSize,
+            true, addCloseOp.clientName, addCloseOp.clientMachine);
+        fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
+
+      } else { // This is OP_ADD on an existing file
+        if (!oldFile.isUnderConstruction()) {
+          // This is case 3: a call to append() on an already-closed file.
+          if (FSNamesystem.LOG.isDebugEnabled()) {
+            FSNamesystem.LOG.debug("Reopening an already-closed file " +
+                "for append");
+          }
+          fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
+              addCloseOp.clientName, addCloseOp.clientMachine, null,
+              false);
+          newFile = getINodeFile(fsDir, addCloseOp.path);
+        }
+      }
+      // Fall-through for case 2.
+      // Regardless of whether it's a new file or an updated file,
+      // update the block list.
+      
+      // Update the salient file attributes.
+      newFile.setAccessTime(addCloseOp.atime);
+      newFile.setModificationTimeForce(addCloseOp.mtime);
+      updateBlocks(fsDir, addCloseOp, newFile);
+      break;
+    }
+    case OP_CLOSE: {
+      AddCloseOp addCloseOp = (AddCloseOp)op;
+      
+      if (FSNamesystem.LOG.isDebugEnabled()) {
+        FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+            " numblocks : " + addCloseOp.blocks.length +
+            " clientHolder " + addCloseOp.clientName +
+            " clientMachine " + addCloseOp.clientMachine);
+      }
+
+      INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
+      if (oldFile == null) {
+        throw new IOException("Operation trying to close non-existent file " +
+            addCloseOp.path);
+      }
+      
+      // Update in-memory data structures
+      updateBlocks(fsDir, addCloseOp, oldFile);
+
+      // Now close the file
+      if (!oldFile.isUnderConstruction() &&
+          logVersion <= LayoutVersion.BUGFIX_HDFS_2991_VERSION) {
+        // There was a bug (HDFS-2991) in hadoop < 0.23.1 where OP_CLOSE
+        // could show up twice in a row. But after that version, this
+        // should be fixed, so we should treat it as an error.
+        throw new IOException(
+            "File is not under construction: " + addCloseOp.path);
+      }
+      // One might expect that you could use removeLease(holder, path) here,
+      // but OP_CLOSE doesn't serialize the holder. So, remove by path.
+      if (oldFile.isUnderConstruction()) {
+        INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
+        fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
+        INodeFile newFile = ucFile.convertToInodeFile();
+        fsDir.replaceNode(addCloseOp.path, ucFile, newFile);
+      }
+      break;
+    }
+    case OP_UPDATE_BLOCKS: {
+      UpdateBlocksOp updateOp = (UpdateBlocksOp)op;
+      if (FSNamesystem.LOG.isDebugEnabled()) {
+        FSNamesystem.LOG.debug(op.opCode + ": " + updateOp.path +
+            " numblocks : " + updateOp.blocks.length);
+      }
+      INodeFile oldFile = getINodeFile(fsDir, updateOp.path);
+      if (oldFile == null) {
+        throw new IOException(
+            "Operation trying to update blocks in non-existent file " +
+            updateOp.path);
+      }
+      
+      // Update in-memory data structures
+      updateBlocks(fsDir, updateOp, oldFile);
+      break;
+    }
+      
+    case OP_SET_REPLICATION: {
+      SetReplicationOp setReplicationOp = (SetReplicationOp)op;
+      short replication = fsNamesys.getBlockManager().adjustReplication(
+          setReplicationOp.replication);
+      fsDir.unprotectedSetReplication(setReplicationOp.path,
+                                      replication, null);
+      break;
+    }
+    case OP_CONCAT_DELETE: {
+      ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
+      fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
+          concatDeleteOp.timestamp);
+      break;
+    }
+    case OP_RENAME_OLD: {
+      RenameOldOp renameOp = (RenameOldOp)op;
+      HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
+      fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
+                                renameOp.timestamp);
+      fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
+      break;
+    }
+    case OP_DELETE: {
+      DeleteOp deleteOp = (DeleteOp)op;
+      fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
+      break;
+    }
+    case OP_MKDIR: {
+      MkdirOp mkdirOp = (MkdirOp)op;
+      PermissionStatus permissions = fsNamesys.getUpgradePermission();
+      if (mkdirOp.permissions != null) {
+        permissions = mkdirOp.permissions;
+      }
+
+      fsDir.unprotectedMkdir(mkdirOp.path, permissions,
+                             mkdirOp.timestamp);
+      break;
+    }
+    case OP_SET_GENSTAMP: {
+      SetGenstampOp setGenstampOp = (SetGenstampOp)op;
+      fsNamesys.setGenerationStamp(setGenstampOp.genStamp);
+      break;
+    }
+    case OP_SET_PERMISSIONS: {
+      SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
+      fsDir.unprotectedSetPermission(setPermissionsOp.src,
+                                     setPermissionsOp.permissions);
+      break;
+    }
+    case OP_SET_OWNER: {
+      SetOwnerOp setOwnerOp = (SetOwnerOp)op;
+      fsDir.unprotectedSetOwner(setOwnerOp.src, setOwnerOp.username,
+                                setOwnerOp.groupname);
+      break;
+    }
+    case OP_SET_NS_QUOTA: {
+      SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
+      fsDir.unprotectedSetQuota(setNSQuotaOp.src,
+                                setNSQuotaOp.nsQuota,
+                                HdfsConstants.QUOTA_DONT_SET);
+      break;
+    }
+    case OP_CLEAR_NS_QUOTA: {
+      ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
+      fsDir.unprotectedSetQuota(clearNSQuotaOp.src,
+                                HdfsConstants.QUOTA_RESET,
+                                HdfsConstants.QUOTA_DONT_SET);
+      break;
+    }
+
+    case OP_SET_QUOTA:
+      SetQuotaOp setQuotaOp = (SetQuotaOp)op;
+      fsDir.unprotectedSetQuota(setQuotaOp.src,
+                                setQuotaOp.nsQuota,
+                                setQuotaOp.dsQuota);
+      break;
+
+    case OP_TIMES: {
+      TimesOp timesOp = (TimesOp)op;
+
+      fsDir.unprotectedSetTimes(timesOp.path,
+                                timesOp.mtime,
+                                timesOp.atime, true);
+      break;
+    }
+    case OP_SYMLINK: {
+      SymlinkOp symlinkOp = (SymlinkOp)op;
+      fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value,
+                               symlinkOp.mtime, symlinkOp.atime,
+                               symlinkOp.permissionStatus);
+      break;
+    }
+    case OP_RENAME: {
+      RenameOp renameOp = (RenameOp)op;
+
+      HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
+      fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
+                                renameOp.timestamp, renameOp.options);
+      fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
+      break;
+    }
+    case OP_GET_DELEGATION_TOKEN: {
+      GetDelegationTokenOp getDelegationTokenOp
+        = (GetDelegationTokenOp)op;
+
+      fsNamesys.getDelegationTokenSecretManager()
+        .addPersistedDelegationToken(getDelegationTokenOp.token,
+                                     getDelegationTokenOp.expiryTime);
+      break;
+    }
+    case OP_RENEW_DELEGATION_TOKEN: {
+      RenewDelegationTokenOp renewDelegationTokenOp
+        = (RenewDelegationTokenOp)op;
+      fsNamesys.getDelegationTokenSecretManager()
+        .updatePersistedTokenRenewal(renewDelegationTokenOp.token,
+                                     renewDelegationTokenOp.expiryTime);
+      break;
+    }
+    case OP_CANCEL_DELEGATION_TOKEN: {
+      CancelDelegationTokenOp cancelDelegationTokenOp
+        = (CancelDelegationTokenOp)op;
+      fsNamesys.getDelegationTokenSecretManager()
+          .updatePersistedTokenCancellation(
+              cancelDelegationTokenOp.token);
+      break;
+    }
+    case OP_UPDATE_MASTER_KEY: {
+      UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op;
+      fsNamesys.getDelegationTokenSecretManager()
+        .updatePersistedMasterKey(updateMasterKeyOp.key);
+      break;
+    }
+    case OP_REASSIGN_LEASE: {
+      ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op;
+
+      Lease lease = fsNamesys.leaseManager.getLease(
+          reassignLeaseOp.leaseHolder);
+      INodeFileUnderConstruction pendingFile =
+          (INodeFileUnderConstruction) fsDir.getFileINode(
+              reassignLeaseOp.path);
+      fsNamesys.reassignLeaseInternal(lease,
+          reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
+      break;
+    }
+    case OP_START_LOG_SEGMENT:
+    case OP_END_LOG_SEGMENT: {
+      // no data in here currently.
+      break;
+    }
+    case OP_DATANODE_ADD:
+    case OP_DATANODE_REMOVE:
+      break;
+    default:
+      throw new IOException("Invalid operation read " + op.opCode);
+    }
+  }
+  
+  private static String formatEditLogReplayError(EditLogInputStream in,
+      long recentOpcodeOffsets[], long txid) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Error replaying edit log at offset " + in.getPosition());
+    sb.append(" on transaction ID ").append(txid);
+    if (recentOpcodeOffsets[0] != -1) {
+      Arrays.sort(recentOpcodeOffsets);
+      sb.append("\nRecent opcode offsets:");
+      for (long offset : recentOpcodeOffsets) {
+        if (offset != -1) {
+          sb.append(' ').append(offset);
+        }
+      }
+    }
+    return sb.toString();
+  }
+  
+  private static INodeFile getINodeFile(FSDirectory fsDir, String path)
+      throws IOException {
+    INode inode = fsDir.getINode(path);
+    if (inode != null) {
+      if (!(inode instanceof INodeFile)) {
+        throw new IOException("Operation trying to get non-file " + path);
+      }
+    }
+    return (INodeFile)inode;
+  }
+  
+  /**
+   * Update in-memory data structures with new block information.
+   * @throws IOException
+   */
+  private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
+      INodeFile file) throws IOException {
+    // Update its block list
+    BlockInfo[] oldBlocks = file.getBlocks();
+    Block[] newBlocks = op.getBlocks();
+    String path = op.getPath();
+    
+    // Are we only updating the last block's gen stamp.
+    boolean isGenStampUpdate = oldBlocks.length == newBlocks.length;
+    
+    // First, update blocks in common
+    for (int i = 0; i < oldBlocks.length && i < newBlocks.length; i++) {
+      BlockInfo oldBlock = oldBlocks[i];
+      Block newBlock = newBlocks[i];
+      
+      boolean isLastBlock = i == newBlocks.length - 1;
+      if (oldBlock.getBlockId() != newBlock.getBlockId() ||
+          (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() && 
+              !(isGenStampUpdate && isLastBlock))) {
+        throw new IOException("Mismatched block IDs or generation stamps, " + 
+            "attempting to replace block " + oldBlock + " with " + newBlock +
+            " as block # " + i + "/" + newBlocks.length + " of " +
+            path);
+      }
+      
+      oldBlock.setNumBytes(newBlock.getNumBytes());
+      boolean changeMade =
+        oldBlock.getGenerationStamp() != newBlock.getGenerationStamp();
+      oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
+      
+      if (oldBlock instanceof BlockInfoUnderConstruction &&
+          (!isLastBlock || op.shouldCompleteLastBlock())) {
+        changeMade = true;
+        fsNamesys.getBlockManager().forceCompleteBlock(
+            (INodeFileUnderConstruction)file,
+            (BlockInfoUnderConstruction)oldBlock);
+      }
+      if (changeMade) {
+        // The state or gen-stamp of the block has changed. So, we may be
+        // able to process some messages from datanodes that we previously
+        // were unable to process.
+        fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
+      }
+    }
+    
+    if (newBlocks.length < oldBlocks.length) {
+      // We're removing a block from the file, e.g. abandonBlock(...)
+      if (!file.isUnderConstruction()) {
+        throw new IOException("Trying to remove a block from file " +
+            path + " which is not under construction.");
+      }
+      if (newBlocks.length != oldBlocks.length - 1) {
+        throw new IOException("Trying to remove more than one block from file "
+            + path);
+      }
+      fsDir.unprotectedRemoveBlock(path,
+          (INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]);
+    } else if (newBlocks.length > oldBlocks.length) {
+      // We're adding blocks
+      for (int i = oldBlocks.length; i < newBlocks.length; i++) {
+        Block newBlock = newBlocks[i];
+        BlockInfo newBI;
+        if (!op.shouldCompleteLastBlock()) {
+          // TODO: shouldn't this only be true for the last block?
+          // what about an old-version fsync() where fsync isn't called
+          // until several blocks in?
+          newBI = new BlockInfoUnderConstruction(
+              newBlock, file.getReplication());
+        } else {
+          // OP_CLOSE should add finalized blocks. This code path
+          // is only executed when loading edits written by prior
+          // versions of Hadoop. Current versions always log
+          // OP_ADD operations as each block is allocated.
+          newBI = new BlockInfo(newBlock, file.getReplication());
+        }
+        fsNamesys.getBlockManager().addINode(newBI, file);
+        file.addBlock(newBI);
+        fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
+      }
+    }
+  }
 
   private static void dumpOpCounts(
       EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts) {
@@ -517,19 +657,21 @@ public class FSEditLogLoader {
       FSImage.LOG.debug("Caught exception after reading " + numValid +
           " ops from " + in + " while determining its valid length.", t);
     }
-    return new EditLogValidation(lastPos, firstTxId, lastTxId);
+    return new EditLogValidation(lastPos, firstTxId, lastTxId, false);
   }
   
   static class EditLogValidation {
-    private long validLength;
-    private long startTxId;
-    private long endTxId;
+    private final long validLength;
+    private final long startTxId;
+    private final long endTxId;
+    private final boolean corruptionDetected;
      
-    EditLogValidation(long validLength, 
-                      long startTxId, long endTxId) {
+    EditLogValidation(long validLength, long startTxId, long endTxId,
+        boolean corruptionDetected) {
       this.validLength = validLength;
       this.startTxId = startTxId;
       this.endTxId = endTxId;
+      this.corruptionDetected = corruptionDetected;
     }
     
     long getValidLength() { return validLength; }
@@ -545,6 +687,8 @@ public class FSEditLogLoader {
       }
       return (endTxId - startTxId) + 1;
     }
+    
+    boolean hasCorruptHeader() { return corruptionDetected; }
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Sat Mar  3 00:42:49 2012
@@ -101,6 +101,7 @@ public abstract class FSEditLogOp {
                       new LogSegmentOp(OP_START_LOG_SEGMENT));
         instances.put(OP_END_LOG_SEGMENT,
                       new LogSegmentOp(OP_END_LOG_SEGMENT));
+        instances.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
         return instances;
       }
   };
@@ -128,8 +129,14 @@ public abstract class FSEditLogOp {
   abstract void writeFields(DataOutputStream out)
       throws IOException;
 
+  static interface BlockListUpdatingOp {
+    Block[] getBlocks();
+    String getPath();
+    boolean shouldCompleteLastBlock();
+  }
+  
   @SuppressWarnings("unchecked")
-  static abstract class AddCloseOp extends FSEditLogOp {
+  static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp {
     int length;
     String path;
     short replication;
@@ -151,6 +158,10 @@ public abstract class FSEditLogOp {
       this.path = path;
       return (T)this;
     }
+    
+    public String getPath() {
+      return path;
+    }
 
     <T extends AddCloseOp> T setReplication(short replication) {
       this.replication = replication;
@@ -176,6 +187,10 @@ public abstract class FSEditLogOp {
       this.blocks = blocks;
       return (T)this;
     }
+    
+    public Block[] getBlocks() {
+      return blocks;
+    }
 
     <T extends AddCloseOp> T setPermissionStatus(PermissionStatus permissions) {
       this.permissions = permissions;
@@ -347,6 +362,10 @@ public abstract class FSEditLogOp {
       return (AddOp)opInstances.get().get(OP_ADD);
     }
 
+    public boolean shouldCompleteLastBlock() {
+      return false;
+    }
+
     @Override
     public String toString() {
       StringBuilder builder = new StringBuilder();
@@ -365,6 +384,10 @@ public abstract class FSEditLogOp {
       return (CloseOp)opInstances.get().get(OP_CLOSE);
     }
 
+    public boolean shouldCompleteLastBlock() {
+      return true;
+    }
+
     @Override
     public String toString() {
       StringBuilder builder = new StringBuilder();
@@ -373,6 +396,68 @@ public abstract class FSEditLogOp {
       return builder.toString();
     }
   }
+  
+  static class UpdateBlocksOp extends FSEditLogOp implements BlockListUpdatingOp {
+    String path;
+    Block[] blocks;
+    
+    private UpdateBlocksOp() {
+      super(OP_UPDATE_BLOCKS);
+    }
+    
+    static UpdateBlocksOp getInstance() {
+      return (UpdateBlocksOp)opInstances.get()
+        .get(OP_UPDATE_BLOCKS);
+    }
+    
+    
+    UpdateBlocksOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+    
+    public String getPath() {
+      return path;
+    }
+
+    UpdateBlocksOp setBlocks(Block[] blocks) {
+      this.blocks = blocks;
+      return this;
+    }
+    
+    public Block[] getBlocks() {
+      return blocks;
+    }
+
+    @Override
+    void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(path, out);
+      FSImageSerialization.writeCompactBlockArray(blocks, out);
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      path = FSImageSerialization.readString(in);
+      this.blocks = FSImageSerialization.readCompactBlockArray(
+          in, logVersion);
+    }
+
+    @Override
+    public boolean shouldCompleteLastBlock() {
+      return false;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("UpdateBlocksOp [path=")
+        .append(path)
+        .append(", blocks=")
+        .append(Arrays.toString(blocks))
+        .append("]");
+      return sb.toString();
+    }
+  }
 
   static class SetReplicationOp extends FSEditLogOp {
     String path;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Sat Mar  3 00:42:49 2012
@@ -55,7 +55,8 @@ public enum FSEditLogOpCodes {
   OP_UPDATE_MASTER_KEY          ((byte) 21),
   OP_REASSIGN_LEASE             ((byte) 22),
   OP_END_LOG_SEGMENT            ((byte) 23),
-  OP_START_LOG_SEGMENT          ((byte) 24);
+  OP_START_LOG_SEGMENT          ((byte) 24),
+  OP_UPDATE_BLOCKS              ((byte) 25);
 
   private byte opCode;
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Sat Mar  3 00:42:49 2012
@@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -68,7 +70,7 @@ import com.google.common.collect.Lists;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class FSImage implements Closeable {
-  protected static final Log LOG = LogFactory.getLog(FSImage.class.getName());
+  public static final Log LOG = LogFactory.getLog(FSImage.class.getName());
 
   protected FSEditLog editLog = null;
   private boolean isUpgradeFinalized = false;
@@ -112,7 +114,8 @@ public class FSImage implements Closeabl
    * @throws IOException if directories are invalid.
    */
   protected FSImage(Configuration conf,
-                    Collection<URI> imageDirs, Collection<URI> editsDirs)
+                    Collection<URI> imageDirs,
+                    List<URI> editsDirs)
       throws IOException {
     this.conf = conf;
 
@@ -123,6 +126,12 @@ public class FSImage implements Closeabl
     }
 
     this.editLog = new FSEditLog(conf, storage, editsDirs);
+    String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
+    if (!HAUtil.isHAEnabled(conf, nameserviceId)) {
+      editLog.initJournalsForWrite();
+    } else {
+      editLog.initSharedJournalsForRead();
+    }
     
     archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
   }
@@ -251,6 +260,11 @@ public class FSImage implements Closeabl
       StorageState curState;
       try {
         curState = sd.analyzeStorage(startOpt, storage);
+        String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
+        if (curState != StorageState.NORMAL && HAUtil.isHAEnabled(conf, nameserviceId)) {
+          throw new IOException("Cannot start an HA namenode with name dirs " +
+              "that need recovery. Dir: " + sd + " state: " + curState);
+        }
         // sd is locked but not opened
         switch(curState) {
         case NON_EXISTENT:
@@ -324,9 +338,9 @@ public class FSImage implements Closeabl
         File prevDir = sd.getPreviousDir();
         File tmpDir = sd.getPreviousTmp();
         assert curDir.exists() : "Current directory must exist.";
-        assert !prevDir.exists() : "prvious directory must not exist.";
-        assert !tmpDir.exists() : "prvious.tmp directory must not exist.";
-        assert !editLog.isOpen() : "Edits log must not be open.";
+        assert !prevDir.exists() : "previous directory must not exist.";
+        assert !tmpDir.exists() : "previous.tmp directory must not exist.";
+        assert !editLog.isSegmentOpen() : "Edits log must not be open.";
 
         // rename current to tmp
         NNStorage.rename(curDir, tmpDir);
@@ -469,7 +483,7 @@ public class FSImage implements Closeabl
   void doImportCheckpoint(FSNamesystem target) throws IOException {
     Collection<URI> checkpointDirs =
       FSImage.getCheckpointDirs(conf, null);
-    Collection<URI> checkpointEditsDirs =
+    List<URI> checkpointEditsDirs =
       FSImage.getCheckpointEditsDirs(conf, null);
 
     if (checkpointDirs == null || checkpointDirs.isEmpty()) {
@@ -519,11 +533,9 @@ public class FSImage implements Closeabl
     return editLog;
   }
 
-  void openEditLog() throws IOException {
+  void openEditLogForWrite() throws IOException {
     assert editLog != null : "editLog must be initialized";
-    Preconditions.checkState(!editLog.isOpen(),
-        "edit log should not yet be open");
-    editLog.open();
+    editLog.openForWrite();
     storage.writeTransactionIdFileToStorage(editLog.getCurSegmentTxId());
   };
   
@@ -564,12 +576,19 @@ public class FSImage implements Closeabl
 
     Iterable<EditLogInputStream> editStreams = null;
 
-    editLog.recoverUnclosedStreams();
+    if (editLog.isOpenForWrite()) {
+      // We only want to recover streams if we're going into Active mode.
+      editLog.recoverUnclosedStreams();
+    }
 
     if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, 
                                getLayoutVersion())) {
+      // If we're open for write, we're either non-HA or we're the active NN, so
+      // we better be able to load all the edits. If we're the standby NN, it's
+      // OK to not be able to read all of edits right now.
+      long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0;
       editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
-                                               inspector.getMaxSeenTxId());
+          toAtLeastTxId, false);
     } else {
       editStreams = FSImagePreTransactionalStorageInspector
         .getEditLogStreams(storage);
@@ -644,12 +663,12 @@ public class FSImage implements Closeabl
    * Load the specified list of edit files into the image.
    * @return the number of transactions loaded
    */
-  protected long loadEdits(Iterable<EditLogInputStream> editStreams,
-                           FSNamesystem target) throws IOException {
+  public long loadEdits(Iterable<EditLogInputStream> editStreams,
+      FSNamesystem target) throws IOException, EditLogInputException {
     LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editStreams));
 
     long startingTxId = getLastAppliedTxId() + 1;
-    int numLoaded = 0;
+    long numLoaded = 0;
 
     try {    
       FSEditLogLoader loader = new FSEditLogLoader(target);
@@ -657,17 +676,26 @@ public class FSImage implements Closeabl
       // Load latest edits
       for (EditLogInputStream editIn : editStreams) {
         LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
-        int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
-        startingTxId += thisNumLoaded;
-        numLoaded += thisNumLoaded;
-        lastAppliedTxId += thisNumLoaded;
+        long thisNumLoaded = 0;
+        try {
+          thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
+        } catch (EditLogInputException elie) {
+          thisNumLoaded = elie.getNumEditsLoaded();
+          throw elie;
+        } finally {
+          // Update lastAppliedTxId even in case of error, since some ops may
+          // have been successfully applied before the error.
+          lastAppliedTxId = startingTxId + thisNumLoaded - 1;
+          startingTxId += thisNumLoaded;
+          numLoaded += thisNumLoaded;
+        }
       }
     } finally {
       FSEditLog.closeAllStreams(editStreams);
+      // update the counts
+      target.dir.updateCountForINodeWithQuota();   
     }
-
-    // update the counts
-    target.dir.updateCountForINodeWithQuota();    
+    
     return numLoaded;
   }
 
@@ -688,8 +716,7 @@ public class FSImage implements Closeabl
   
   /**
    * Load in the filesystem image from file. It's a big list of
-   * filenames and blocks.  Return whether we should
-   * "re-save" and consolidate the edit-logs
+   * filenames and blocks.
    */
   private void loadFSImage(File curFile, MD5Hash expectedMd5,
       FSNamesystem target) throws IOException {
@@ -786,16 +813,16 @@ public class FSImage implements Closeabl
    * Save the contents of the FS image to a new image file in each of the
    * current storage directories.
    */
-  synchronized void saveNamespace(FSNamesystem source) throws IOException {
+  public synchronized void saveNamespace(FSNamesystem source) throws IOException {
     assert editLog != null : "editLog must be initialized";
     storage.attemptRestoreRemovedStorage();
 
-    boolean editLogWasOpen = editLog.isOpen();
+    boolean editLogWasOpen = editLog.isSegmentOpen();
     
     if (editLogWasOpen) {
       editLog.endCurrentLogSegment(true);
     }
-    long imageTxId = editLog.getLastWrittenTxId();
+    long imageTxId = getLastAppliedOrWrittenTxId();
     try {
       saveFSImageInAllDirs(source, imageTxId);
       storage.writeAll();
@@ -812,7 +839,7 @@ public class FSImage implements Closeabl
     
   }
   
-  void cancelSaveNamespace(String reason)
+  public void cancelSaveNamespace(String reason)
       throws InterruptedException {
     SaveNamespaceContext ctx = curSaveNamespaceContext;
     if (ctx != null) {
@@ -1061,7 +1088,7 @@ public class FSImage implements Closeabl
     return Util.stringCollectionAsURIs(dirNames);
   }
 
-  static Collection<URI> getCheckpointEditsDirs(Configuration conf,
+  static List<URI> getCheckpointEditsDirs(Configuration conf,
       String defaultName) {
     Collection<String> dirNames = 
       conf.getStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY);
@@ -1095,4 +1122,16 @@ public class FSImage implements Closeabl
     return lastAppliedTxId;
   }
 
+  public long getLastAppliedOrWrittenTxId() {
+    return Math.max(lastAppliedTxId,
+        editLog != null ? editLog.getLastWrittenTxId() : 0);
+  }
+
+  public void updateLastAppliedTxIdFromWritten() {
+    this.lastAppliedTxId = editLog.getLastWrittenTxId();
+  }
+
+  public synchronized long getMostRecentCheckpointTxId() {
+    return storage.getMostRecentCheckpointTxId();
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Sat Mar  3 00:42:49 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Static utility functions for serializing various pieces of data in the correct
@@ -277,6 +278,49 @@ public class FSImageSerialization {
       ustr.getLength(), (byte) Path.SEPARATOR_CHAR);
   }
 
+
+  /**
+   * Write an array of blocks as compactly as possible. This uses
+   * delta-encoding for the generation stamp and size, following
+   * the principle that genstamp increases relatively slowly,
+   * and size is equal for all but the last block of a file.
+   */
+  public static void writeCompactBlockArray(
+      Block[] blocks, DataOutputStream out) throws IOException {
+    WritableUtils.writeVInt(out, blocks.length);
+    Block prev = null;
+    for (Block b : blocks) {
+      long szDelta = b.getNumBytes() -
+          (prev != null ? prev.getNumBytes() : 0);
+      long gsDelta = b.getGenerationStamp() -
+          (prev != null ? prev.getGenerationStamp() : 0);
+      out.writeLong(b.getBlockId()); // blockid is random
+      WritableUtils.writeVLong(out, szDelta);
+      WritableUtils.writeVLong(out, gsDelta);
+      prev = b;
+    }
+  }
+  
+  public static Block[] readCompactBlockArray(
+      DataInputStream in, int logVersion) throws IOException {
+    int num = WritableUtils.readVInt(in);
+    if (num < 0) {
+      throw new IOException("Invalid block array length: " + num);
+    }
+    Block prev = null;
+    Block[] ret = new Block[num];
+    for (int i = 0; i < num; i++) {
+      long id = in.readLong();
+      long sz = WritableUtils.readVLong(in) +
+          ((prev != null) ? prev.getNumBytes() : 0);
+      long gs = WritableUtils.readVLong(in) +
+          ((prev != null) ? prev.getGenerationStamp() : 0);
+      ret[i] = new Block(id, sz, gs);
+      prev = ret[i];
+    }
+    return ret;
+  }
+
   /**
    * DatanodeImage is used to store persistent information
    * about datanodes into the fsImage.



Mime
View raw message