hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1296534 [6/11] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/ src...
Date Sat, 03 Mar 2012 00:43:00 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Mar  3 00:42:49 2012
@@ -32,6 +32,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
@@ -47,10 +49,15 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
@@ -68,6 +75,7 @@ import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.URI;
@@ -80,6 +88,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -108,7 +117,10 @@ import org.apache.hadoop.fs.UnresolvedLi
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -147,9 +159,18 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
+import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
+import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
+import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -157,6 +178,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -170,13 +192,12 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
-import com.google.common.base.Preconditions;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /***************************************************
  * FSNamesystem does the actual bookkeeping work for the
@@ -194,7 +215,7 @@ import com.google.common.annotations.Vis
 @Metrics(context="dfs")
 public class FSNamesystem implements Namesystem, FSClusterStats,
     FSNamesystemMBean, NameNodeMXBean {
-  static final Log LOG = LogFactory.getLog(FSNamesystem.class);
+  public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
 
   private static final ThreadLocal<StringBuilder> auditBuffer =
     new ThreadLocal<StringBuilder>() {
@@ -243,14 +264,18 @@ public class FSNamesystem implements Nam
   static final int DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED = 100;
   static int BLOCK_DELETION_INCREMENT = 1000;
   private boolean isPermissionEnabled;
+  private boolean persistBlocks;
   private UserGroupInformation fsOwner;
   private String supergroup;
   private PermissionStatus defaultPermission;
+  private boolean standbyShouldCheckpoint;
   
   // Scan interval is not configurable.
   private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
     TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
   private DelegationTokenSecretManager dtSecretManager;
+  private boolean alwaysUseDelegationTokensForTests;
+  
 
   //
   // Stores the correct file name hierarchy
@@ -264,7 +289,6 @@ public class FSNamesystem implements Nam
 
   LeaseManager leaseManager = new LeaseManager(this); 
 
-  Daemon lmthread = null;   // LeaseMonitor thread
   Daemon smmthread = null;  // SafeModeMonitor thread
   
   Daemon nnrmthread = null; // NamenodeResourceMonitor thread
@@ -300,7 +324,26 @@ public class FSNamesystem implements Nam
   // lock to protect FSNamesystem.
   private ReentrantReadWriteLock fsLock;
 
-  
+  /**
+   * Used when this NN is in standby state to read from the shared edit log.
+   */
+  private EditLogTailer editLogTailer = null;
+
+  /**
+   * Used when this NN is in standby state to perform checkpoints.
+   */
+  private StandbyCheckpointer standbyCheckpointer;
+
+  /**
+   * Reference to the NN's HAContext object. This is only set once
+   * {@link #startCommonServices(Configuration, HAContext)} is called. 
+   */
+  private HAContext haContext;
+
+  private boolean haEnabled;
+
+  private final Configuration conf;
+    
   /**
    * Instantiates an FSNamesystem loaded from the image and edits
    * directories specified in the passed Configuration.
@@ -310,9 +353,10 @@ public class FSNamesystem implements Nam
    * @return an FSNamesystem which contains the loaded namespace
    * @throws IOException if loading fails
    */
-  public static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
+  public static FSNamesystem loadFromDisk(Configuration conf)
+    throws IOException {
     Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
-    Collection<URI> namespaceEditsDirs = 
+    List<URI> namespaceEditsDirs = 
       FSNamesystem.getNamespaceEditsDirs(conf);
 
     if (namespaceDirs.size() == 1) {
@@ -329,7 +373,9 @@ public class FSNamesystem implements Nam
 
     long loadStart = now();
     StartupOption startOpt = NameNode.getStartupOption(conf);
-    namesystem.loadFSImage(startOpt, fsImage);
+    String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
+    namesystem.loadFSImage(startOpt, fsImage,
+      HAUtil.isHAEnabled(conf, nameserviceId));
     long timeTakenToLoadFSImage = now() - loadStart;
     LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
     NameNode.getNameNodeMetrics().setFsImageLoadTime(
@@ -348,6 +394,7 @@ public class FSNamesystem implements Nam
    * @throws IOException on bad configuration
    */
   FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
+    this.conf = conf;
     try {
       initialize(conf, fsImage);
     } catch(IOException e) {
@@ -375,7 +422,7 @@ public class FSNamesystem implements Nam
     this.safeMode = new SafeModeInfo(conf);
   }
 
-  void loadFSImage(StartupOption startOpt, FSImage fsImage)
+  void loadFSImage(StartupOption startOpt, FSImage fsImage, boolean haEnabled)
       throws IOException {
     // format before starting up if requested
     if (startOpt == StartupOption.FORMAT) {
@@ -385,43 +432,71 @@ public class FSNamesystem implements Nam
       startOpt = StartupOption.REGULAR;
     }
     boolean success = false;
+    writeLock();
     try {
-      if (fsImage.recoverTransitionRead(startOpt, this)) {
+      // We shouldn't be calling saveNamespace if we've come up in standby state.
+      if (fsImage.recoverTransitionRead(startOpt, this) && !haEnabled) {
         fsImage.saveNamespace(this);
       }
-      fsImage.openEditLog();
+      // This will start a new log segment and write to the seen_txid file, so
+      // we shouldn't do it when coming up in standby state
+      if (!haEnabled) {
+        fsImage.openEditLogForWrite();
+      }
       
       success = true;
     } finally {
       if (!success) {
         fsImage.close();
       }
+      writeUnlock();
     }
     dir.imageLoadComplete();
   }
 
-  void activateSecretManager() throws IOException {
+  private void startSecretManager() {
     if (dtSecretManager != null) {
-      dtSecretManager.startThreads();
+      try {
+        dtSecretManager.startThreads();
+      } catch (IOException e) {
+        // Inability to start secret manager
+        // can't be recovered from.
+        throw new RuntimeException(e);
+      }
     }
   }
   
-  /**
-   * Activate FSNamesystem daemons.
+  private void startSecretManagerIfNecessary() {
+    boolean shouldRun = shouldUseDelegationTokens() &&
+      !isInSafeMode() && getEditLog().isOpenForWrite();
+    boolean running = dtSecretManager.isRunning();
+    if (shouldRun && !running) {
+      startSecretManager();
+    }
+  }
+
+  private void stopSecretManager() {
+    if (dtSecretManager != null) {
+      dtSecretManager.stopThreads();
+    }
+  }
+  
+  /** 
+   * Start services common to both active and standby states
+   * @param haContext 
+   * @throws IOException
    */
-  void activate(Configuration conf) throws IOException {
+  void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
     this.registerMBean(); // register the MBean for the FSNamesystemState
-
     writeLock();
+    this.haContext = haContext;
     try {
       nnResourceChecker = new NameNodeResourceChecker(conf);
       checkAvailableResources();
-
+      assert safeMode != null &&
+        !safeMode.isPopulatingReplQueues();
       setBlockTotal();
       blockManager.activate(conf);
-
-      this.lmthread = new Daemon(leaseManager.new Monitor());
-      lmthread.start();
       this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
       nnrmthread.start();
     } finally {
@@ -431,24 +506,169 @@ public class FSNamesystem implements Nam
     registerMXBean();
     DefaultMetricsSystem.instance().register(this);
   }
+  
+  /** 
+   * Stop services common to both active and standby states
+   * @throws IOException
+   */
+  void stopCommonServices() {
+    writeLock();
+    try {
+      if (blockManager != null) blockManager.close();
+      if (nnrmthread != null) nnrmthread.interrupt();
+    } finally {
+      writeUnlock();
+    }
+  }
+  
+  /**
+   * Start services required in active state
+   * @throws IOException
+   */
+  void startActiveServices() throws IOException {
+    LOG.info("Starting services required for active state");
+    writeLock();
+    try {
+      FSEditLog editLog = dir.fsImage.getEditLog();
+      
+      if (!editLog.isOpenForWrite()) {
+        // During startup, we're already open for write during initialization.
+        editLog.initJournalsForWrite();
+        // May need to recover
+        editLog.recoverUnclosedStreams();
+        
+        LOG.info("Catching up to latest edits from old active before " +
+            "taking over writer role in edits logs.");
+        editLogTailer.catchupDuringFailover();
+        
+        LOG.info("Reprocessing replication and invalidation queues...");
+        blockManager.getDatanodeManager().markAllDatanodesStale();
+        blockManager.clearQueues();
+        blockManager.processAllPendingDNMessages();
+        blockManager.processMisReplicatedBlocks();
+        
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("NameNode metadata after re-processing " +
+              "replication and invalidation queues during failover:\n" +
+              metaSaveAsString());
+        }
+        
+        long nextTxId = dir.fsImage.getLastAppliedTxId() + 1;
+        LOG.info("Will take over writing edit logs at txnid " + 
+            nextTxId);
+        editLog.setNextTxId(nextTxId);
+
+        dir.fsImage.editLog.openForWrite();
+      }
+      if (haEnabled) {
+        // Renew all of the leases before becoming active.
+        // This is because, while we were in standby mode,
+        // the leases weren't getting renewed on this NN.
+        // Give them all a fresh start here.
+        leaseManager.renewAllLeases();
+      }
+      leaseManager.startMonitor();
+      startSecretManagerIfNecessary();
+    } finally {
+      writeUnlock();
+    }
+  }
 
-  public static Collection<URI> getNamespaceDirs(Configuration conf) {
-    return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
+  private boolean shouldUseDelegationTokens() {
+    return UserGroupInformation.isSecurityEnabled() ||
+      alwaysUseDelegationTokensForTests;
+  }
+
+  /** 
+   * Stop services required in active state
+   * @throws InterruptedException
+   */
+  void stopActiveServices() {
+    LOG.info("Stopping services started for active state");
+    writeLock();
+    try {
+      stopSecretManager();
+      if (leaseManager != null) {
+        leaseManager.stopMonitor();
+      }
+      if (dir != null && dir.fsImage != null) {
+        if (dir.fsImage.editLog != null) {
+          dir.fsImage.editLog.close();
+        }
+        // Update the fsimage with the last txid that we wrote
+        // so that the tailer starts from the right spot.
+        dir.fsImage.updateLastAppliedTxIdFromWritten();
+      }
+    } finally {
+      writeUnlock();
+    }
   }
   
-  public static Collection<URI> getNamespaceEditsDirs(Configuration conf) {
-    Collection<URI> editsDirs = getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY);
-    if (editsDirs.isEmpty()) {
-      // If this is the case, no edit dirs have been explicitly configured.
-      // Image dirs are to be used for edits too.
-      return getNamespaceDirs(conf);
-    } else {
-      return editsDirs;
+  /** Start services required in standby state */
+  void startStandbyServices() {
+    LOG.info("Starting services required for standby state");
+    if (!dir.fsImage.editLog.isOpenForRead()) {
+      // During startup, we're already open for read.
+      dir.fsImage.editLog.initSharedJournalsForRead();
+    }
+    editLogTailer = new EditLogTailer(this);
+    editLogTailer.start();
+    if (standbyShouldCheckpoint) {
+      standbyCheckpointer = new StandbyCheckpointer(conf, this);
+      standbyCheckpointer.start();
+    }
+  }
+
+
+  /**
+   * Called while the NN is in Standby state, but just about to be
+   * asked to enter Active state. This cancels any checkpoints
+   * currently being taken.
+   */
+  void prepareToStopStandbyServices() throws ServiceFailedException {
+    if (standbyCheckpointer != null) {
+      standbyCheckpointer.cancelAndPreventCheckpoints();
+    }
+  }
+
+  /** Stop services required in standby state */
+  void stopStandbyServices() throws IOException {
+    LOG.info("Stopping services started for standby state");
+    if (standbyCheckpointer != null) {
+      standbyCheckpointer.stop();
+    }
+    if (editLogTailer != null) {
+      editLogTailer.stop();
+    }
+    if (dir != null && dir.fsImage != null && dir.fsImage.editLog != null) {
+      dir.fsImage.editLog.close();
+    }
+  }
+  
+  
+  void checkOperation(OperationCategory op) throws StandbyException {
+    if (haContext != null) {
+      // null in some unit tests
+      haContext.checkOperation(op);
     }
   }
   
+  public static Collection<URI> getNamespaceDirs(Configuration conf) {
+    return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
+  }
+
+  /**
+   * Get all edits dirs which are required. If any shared edits dirs are
+   * configured, these are also included in the set of required dirs.
+   * 
+   * @param conf the HDFS configuration.
+   * @return all required dirs.
+   */
   public static Collection<URI> getRequiredNamespaceEditsDirs(Configuration conf) {
-    return getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY);
+    Set<URI> ret = new HashSet<URI>();
+    ret.addAll(getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY));
+    ret.addAll(getSharedEditsDirs(conf));
+    return ret;
   }
 
   private static Collection<URI> getStorageDirs(Configuration conf,
@@ -481,6 +701,75 @@ public class FSNamesystem implements Nam
     return Util.stringCollectionAsURIs(dirNames);
   }
 
+  /**
+   * Return an ordered list of edits directories to write to.
+   * The list is ordered such that all shared edits directories
+   * are ordered before non-shared directories, and any duplicates
+   * are removed. The order they are specified in the configuration
+   * is retained.
+   * @return Collection of shared edits directories.
+   * @throws IOException if multiple shared edits directories are configured
+   */
+  public static List<URI> getNamespaceEditsDirs(Configuration conf)
+      throws IOException {
+    // Use a LinkedHashSet so that order is maintained while we de-dup
+    // the entries.
+    LinkedHashSet<URI> editsDirs = new LinkedHashSet<URI>();
+    
+    List<URI> sharedDirs = getSharedEditsDirs(conf);
+
+    // Fail until multiple shared edits directories are supported (HDFS-2782)
+    if (sharedDirs.size() > 1) {
+      throw new IOException(
+          "Multiple shared edits directories are not yet supported");
+    }
+
+    // First add the shared edits dirs. It's critical that the shared dirs
+    // are added first, since JournalSet syncs them in the order they are listed,
+    // and we need to make sure all edits are in place in the shared storage
+    // before they are replicated locally. See HDFS-2874.
+    for (URI dir : sharedDirs) {
+      if (!editsDirs.add(dir)) {
+        LOG.warn("Edits URI " + dir + " listed multiple times in " + 
+            DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ". Ignoring duplicates.");
+      }
+    }
+    
+    // Now add the non-shared dirs.
+    for (URI dir : getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY)) {
+      if (!editsDirs.add(dir)) {
+        LOG.warn("Edits URI " + dir + " listed multiple times in " + 
+            DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " and " +
+            DFS_NAMENODE_EDITS_DIR_KEY + ". Ignoring duplicates.");
+      }
+    }
+
+    if (editsDirs.isEmpty()) {
+      // If this is the case, no edit dirs have been explicitly configured.
+      // Image dirs are to be used for edits too.
+      return Lists.newArrayList(getNamespaceDirs(conf));
+    } else {
+      return Lists.newArrayList(editsDirs);
+    }
+  }
+  
+  /**
+   * Returns edit directories that are shared between primary and secondary.
+   * @param conf
+   * @return Collection of edit directories.
+   */
+  public static List<URI> getSharedEditsDirs(Configuration conf) {
+    // don't use getStorageDirs here, because we want an empty default
+    // rather than the dir in /tmp
+    Collection<String> dirNames = conf.getTrimmedStringCollection(
+        DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
+    return Util.stringCollectionAsURIs(dirNames);
+  }
+  
+  public Configuration getConf() {
+    return conf;
+  }
+
   @Override
   public void readLock() {
     this.fsLock.readLock().lock();
@@ -494,6 +783,10 @@ public class FSNamesystem implements Nam
     this.fsLock.writeLock().lock();
   }
   @Override
+  public void writeLockInterruptibly() throws InterruptedException {
+    this.fsLock.writeLock().lockInterruptibly();
+  }
+  @Override
   public void writeUnlock() {
     this.fsLock.writeLock().unlock();
   }
@@ -526,6 +819,26 @@ public class FSNamesystem implements Nam
                                                DFS_PERMISSIONS_ENABLED_DEFAULT);
     LOG.info("supergroup=" + supergroup);
     LOG.info("isPermissionEnabled=" + isPermissionEnabled);
+
+    this.persistBlocks = conf.getBoolean(DFS_PERSIST_BLOCKS_KEY,
+                                         DFS_PERSIST_BLOCKS_DEFAULT);
+    // block allocation has to be persisted in HA using a shared edits directory
+    // so that the standby has up-to-date namespace information
+    String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
+    this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId);  
+    this.persistBlocks |= haEnabled && HAUtil.usesSharedEditsDir(conf);
+    
+    // Sanity check the HA-related config.
+    if (nameserviceId != null) {
+      LOG.info("Determined nameservice ID: " + nameserviceId);
+    }
+    LOG.info("HA Enabled: " + haEnabled);
+    if (!haEnabled && HAUtil.usesSharedEditsDir(conf)) {
+      LOG.warn("Configured NNs:\n" + DFSUtil.nnAddressesAsString(conf));
+      throw new IOException("Invalid configuration: a shared edits dir " +
+          "must not be specified if HA is not enabled.");
+    }
+
     short filePermission = (short)conf.getInt(DFS_NAMENODE_UPGRADE_PERMISSION_KEY,
                                               DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
     this.defaultPermission = PermissionStatus.createImmutable(
@@ -546,6 +859,16 @@ public class FSNamesystem implements Nam
         DFS_SUPPORT_APPEND_DEFAULT);
 
     this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
+    
+    this.standbyShouldCheckpoint = conf.getBoolean(
+        DFS_HA_STANDBY_CHECKPOINTS_KEY,
+        DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
+    
+    // For testing purposes, allow the DT secret manager to be started regardless
+    // of whether security is enabled.
+    alwaysUseDelegationTokensForTests = 
+      conf.getBoolean(DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+          DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
   }
 
   /**
@@ -566,7 +889,7 @@ public class FSNamesystem implements Nam
   }
 
   /**
-   * Version of {@see #getNamespaceInfo()} that is not protected by a lock.
+   * Version of @see #getNamespaceInfo() that is not protected by a lock.
    */
   NamespaceInfo unprotectedGetNamespaceInfo() {
     return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
@@ -583,23 +906,16 @@ public class FSNamesystem implements Nam
   void close() {
     fsRunning = false;
     try {
-      if (blockManager != null) blockManager.close();
+      stopCommonServices();
       if (smmthread != null) smmthread.interrupt();
-      if (dtSecretManager != null) dtSecretManager.stopThreads();
-      if (nnrmthread != null) nnrmthread.interrupt();
-    } catch (Exception e) {
-      LOG.warn("Exception shutting down FSNamesystem", e);
     } finally {
       // using finally to ensure we also wait for lease daemon
       try {
-        if (lmthread != null) {
-          lmthread.interrupt();
-          lmthread.join(3000);
-        }
+        stopActiveServices();
+        stopStandbyServices();
         if (dir != null) {
           dir.close();
         }
-      } catch (InterruptedException ie) {
       } catch (IOException ie) {
         LOG.error("Error closing FSDirectory", ie);
         IOUtils.cleanup(LOG, dir);
@@ -611,6 +927,18 @@ public class FSNamesystem implements Nam
   public boolean isRunning() {
     return fsRunning;
   }
+  
+  @Override
+  public boolean isInStandbyState() {
+    if (haContext == null || haContext.getState() == null) {
+      // We're still starting up. In this case, if HA is
+      // on for the cluster, we always start in standby. Otherwise
+      // start in active.
+      return haEnabled;
+    }
+  
+    return haContext.getState() instanceof StandbyState;
+  }
 
   /**
    * Dump all metadata into specified file
@@ -622,14 +950,7 @@ public class FSNamesystem implements Nam
       File file = new File(System.getProperty("hadoop.log.dir"), filename);
       PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
           true)));
-  
-      long totalInodes = this.dir.totalInodes();
-      long totalBlocks = this.getBlocksTotal();
-      out.println(totalInodes + " files and directories, " + totalBlocks
-          + " blocks = " + (totalInodes + totalBlocks) + " total");
-
-      blockManager.metaSave(out);
-
+      metaSave(out);
       out.flush();
       out.close();
     } finally {
@@ -637,11 +958,31 @@ public class FSNamesystem implements Nam
     }
   }
 
+  private void metaSave(PrintWriter out) {
+    assert hasWriteLock();
+    long totalInodes = this.dir.totalInodes();
+    long totalBlocks = this.getBlocksTotal();
+    out.println(totalInodes + " files and directories, " + totalBlocks
+        + " blocks = " + (totalInodes + totalBlocks) + " total");
+
+    blockManager.metaSave(out);
+  }
+
+  private String metaSaveAsString() {
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    metaSave(pw);
+    pw.flush();
+    return sw.toString();
+  }
+  
+
   long getDefaultBlockSize() {
     return serverDefaults.getBlockSize();
   }
 
-  FsServerDefaults getServerDefaults() {
+  FsServerDefaults getServerDefaults() throws StandbyException {
+    checkOperation(OperationCategory.READ);
     return serverDefaults;
   }
 
@@ -668,6 +1009,8 @@ public class FSNamesystem implements Nam
     HdfsFileStatus resultingStat = null;
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set permission for " + src, safeMode);
       }
@@ -697,6 +1040,8 @@ public class FSNamesystem implements Nam
     HdfsFileStatus resultingStat = null;
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
       }
@@ -787,13 +1132,14 @@ public class FSNamesystem implements Nam
       }  else { // second attempt is with  write lock
         writeLock(); // writelock is needed to set accesstime
       }
+      try {
+        checkOperation(OperationCategory.READ);
 
-      // if the namenode is in safemode, then do not update access time
-      if (isInSafeMode()) {
-        doAccessTime = false;
-      }
+        // if the namenode is in safemode, then do not update access time
+        if (isInSafeMode()) {
+          doAccessTime = false;
+        }
 
-      try {
         long now = now();
         INodeFile inode = dir.getFileINode(src);
         if (inode == null) {
@@ -861,6 +1207,7 @@ public class FSNamesystem implements Nam
     HdfsFileStatus resultingStat = null;
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot concat " + target, safeMode);
       }
@@ -992,6 +1339,8 @@ public class FSNamesystem implements Nam
     }
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       // Write access is required to set access and modification times
       if (isPermissionEnabled) {
         checkPathAccess(src, FsAction.WRITE);
@@ -1022,6 +1371,8 @@ public class FSNamesystem implements Nam
     HdfsFileStatus resultingStat = null;
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       if (!createParent) {
         verifyParentDir(link);
       }
@@ -1091,6 +1442,8 @@ public class FSNamesystem implements Nam
     final boolean isFile;
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set replication for " + src, safeMode);
       }
@@ -1121,6 +1474,7 @@ public class FSNamesystem implements Nam
       throws IOException, UnresolvedLinkException {
     readLock();
     try {
+      checkOperation(OperationCategory.READ);
       if (isPermissionEnabled) {
         checkTraverse(filename);
       }
@@ -1163,6 +1517,8 @@ public class FSNamesystem implements Nam
       FileNotFoundException, ParentNotDirectoryException, IOException {
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       startFileInternal(src, permissions, holder, clientMachine, flag,
           createParent, replication, blockSize);
     } finally {
@@ -1266,30 +1622,8 @@ public class FSNamesystem implements Nam
           blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
 
       if (append && myFile != null) {
-        //
-        // Replace current node with a INodeUnderConstruction.
-        // Recreate in-memory lease record.
-        //
-        INodeFile node = (INodeFile) myFile;
-        INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                                        node.getLocalNameBytes(),
-                                        node.getReplication(),
-                                        node.getModificationTime(),
-                                        node.getPreferredBlockSize(),
-                                        node.getBlocks(),
-                                        node.getPermissionStatus(),
-                                        holder,
-                                        clientMachine,
-                                        clientNode);
-        dir.replaceNode(src, node, cons);
-        leaseManager.addLease(cons.getClientName(), src);
-        
-        // convert last block to under-construction
-        LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
-
-        // add append file record to log, record lease, etc.
-        getEditLog().logOpenFile(src, cons);
-        return ret;
+        return prepareFileForWrite(
+            src, myFile, holder, clientMachine, clientNode, true);
       } else {
        // Now we can add the name to the filesystem. This file has no
        // blocks associated with it.
@@ -1320,6 +1654,45 @@ public class FSNamesystem implements Nam
     }
     return null;
   }
+  
+  /**
+   * Replace current node with a INodeUnderConstruction.
+   * Recreate in-memory lease record.
+   * 
+   * @param src path to the file
+   * @param file existing file object
+   * @param leaseHolder identifier of the lease holder on this file
+   * @param clientMachine identifier of the client machine
+   * @param clientNode if the client is collocated with a DN, that DN's descriptor
+   * @param writeToEditLog whether to persist this change to the edit log
+   * @return the last block locations if the block is partial or null otherwise
+   * @throws UnresolvedLinkException
+   * @throws IOException
+   */
+  public LocatedBlock prepareFileForWrite(String src, INode file,
+      String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
+      boolean writeToEditLog)
+      throws UnresolvedLinkException, IOException {
+    INodeFile node = (INodeFile) file;
+    INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+                                    node.getLocalNameBytes(),
+                                    node.getReplication(),
+                                    node.getModificationTime(),
+                                    node.getPreferredBlockSize(),
+                                    node.getBlocks(),
+                                    node.getPermissionStatus(),
+                                    leaseHolder,
+                                    clientMachine,
+                                    clientNode);
+    dir.replaceNode(src, node, cons);
+    leaseManager.addLease(cons.getClientName(), src);
+    
+    LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
+    if (writeToEditLog) {
+      getEditLog().logOpenFile(src, cons);
+    }
+    return ret;
+  }
 
   /**
    * Recover lease;
@@ -1336,6 +1709,8 @@ public class FSNamesystem implements Nam
       throws IOException {
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       if (isInSafeMode()) {
         throw new SafeModeException(
             "Cannot recover the lease of " + src, safeMode);
@@ -1455,6 +1830,8 @@ public class FSNamesystem implements Nam
     LocatedBlock lb = null;
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       lb = startFileInternal(src, null, holder, clientMachine, 
                         EnumSet.of(CreateFlag.APPEND), 
                         false, blockManager.maxReplication, 0);
@@ -1519,6 +1896,8 @@ public class FSNamesystem implements Nam
 
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot add block to " + src, safeMode);
       }
@@ -1552,6 +1931,7 @@ public class FSNamesystem implements Nam
     // Allocate a new block and record it in the INode. 
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot add block to " + src, safeMode);
       }
@@ -1570,10 +1950,14 @@ public class FSNamesystem implements Nam
       
       for (DatanodeDescriptor dn : targets) {
         dn.incBlocksScheduled();
-      }      
+      }
+      dir.persistBlocks(src, pendingFile);
     } finally {
       writeUnlock();
     }
+    if (persistBlocks) {
+      getEditLog().logSync();
+    }
 
     // Create next block
     LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
@@ -1594,6 +1978,7 @@ public class FSNamesystem implements Nam
     final List<DatanodeDescriptor> chosen;
     readLock();
     try {
+      checkOperation(OperationCategory.WRITE);
       //check safe mode
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot add datanode; src=" + src
@@ -1635,6 +2020,7 @@ public class FSNamesystem implements Nam
       UnresolvedLinkException, IOException {
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
       //
       // Remove the block from the pending creates list
       //
@@ -1652,10 +2038,15 @@ public class FSNamesystem implements Nam
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                       + b + " is removed from pendingCreates");
       }
-      return true;
+      dir.persistBlocks(src, file);
     } finally {
       writeUnlock();
     }
+    if (persistBlocks) {
+      getEditLog().logSync();
+    }
+
+    return true;
   }
   
   // make sure that we still have the lease on this file.
@@ -1705,6 +2096,8 @@ public class FSNamesystem implements Nam
     boolean success = false;
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       success = completeFileInternal(src, holder, 
         ExtendedBlock.getLocalBlock(last));
     } finally {
@@ -1764,12 +2157,15 @@ public class FSNamesystem implements Nam
    * @throws QuotaExceededException If addition of block exceeds space quota
    */
   private Block allocateBlock(String src, INode[] inodes,
-      DatanodeDescriptor targets[]) throws QuotaExceededException {
+      DatanodeDescriptor targets[]) throws QuotaExceededException,
+      SafeModeException {
     assert hasWriteLock();
     Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0); 
     while(isValidBlock(b)) {
       b.setBlockId(DFSUtil.getRandom().nextLong());
     }
+    // Increment the generation stamp for every new block.
+    nextGenerationStamp();
     b.setGenerationStamp(getGenerationStamp());
     b = dir.addBlock(src, inodes, b, targets);
     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
@@ -1841,6 +2237,8 @@ public class FSNamesystem implements Nam
     }
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       status = renameToInternal(src, dst);
       if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false);
@@ -1896,6 +2294,8 @@ public class FSNamesystem implements Nam
     }
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       renameToInternal(src, dst, options);
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false); 
@@ -1973,6 +2373,7 @@ public class FSNamesystem implements Nam
 
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot delete " + src, safeMode);
       }
@@ -2028,9 +2429,45 @@ public class FSNamesystem implements Nam
     if (blocks == null) {
       return;
     }
-    for(Block b : blocks) {
+    
+    // In the case that we are a Standby tailing edits from the
+    // active while in safe-mode, we need to track the total number
+    // of blocks and safe blocks in the system.
+    boolean trackBlockCounts = isSafeModeTrackingBlocks();
+    int numRemovedComplete = 0, numRemovedSafe = 0;
+
+    for (Block b : blocks) {
+      if (trackBlockCounts) {
+        BlockInfo bi = blockManager.getStoredBlock(b);
+        if (bi.isComplete()) {
+          numRemovedComplete++;
+          if (bi.numNodes() >= blockManager.minReplication) {
+            numRemovedSafe++;
+          }
+        }
+      }
       blockManager.removeBlock(b);
     }
+    if (trackBlockCounts) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adjusting safe-mode totals for deletion of " + src + ":" +
+            "decreasing safeBlocks by " + numRemovedSafe +
+            ", totalBlocks by " + numRemovedComplete);
+      }
+      adjustSafeModeBlockTotals(-numRemovedSafe, -numRemovedComplete);
+    }
+  }
+
+  /**
+   * @see SafeModeInfo#shouldIncrementallyTrackBlocks
+   */
+  private boolean isSafeModeTrackingBlocks() {
+    if (!haEnabled) {
+      // Never track blocks incrementally in non-HA code.
+      return false;
+    }
+    SafeModeInfo sm = this.safeMode;
+    return sm != null && sm.shouldIncrementallyTrackBlocks();
   }
 
   /**
@@ -2045,11 +2482,15 @@ public class FSNamesystem implements Nam
    *
    * @return object containing information regarding the file
    *         or null if file not found
+   * @throws StandbyException 
    */
   HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
-    throws AccessControlException, UnresolvedLinkException {
+    throws AccessControlException, UnresolvedLinkException,
+           StandbyException {
     readLock();
     try {
+      checkOperation(OperationCategory.READ);
+
       if (!DFSUtil.isValidName(src)) {
         throw new InvalidPathException("Invalid file name: " + src);
       }
@@ -2073,6 +2514,8 @@ public class FSNamesystem implements Nam
     }
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       status = mkdirsInternal(src, permissions, createParent);
     } finally {
       writeUnlock();
@@ -2127,9 +2570,11 @@ public class FSNamesystem implements Nam
   }
 
   ContentSummary getContentSummary(String src) throws AccessControlException,
-      FileNotFoundException, UnresolvedLinkException {
+      FileNotFoundException, UnresolvedLinkException, StandbyException {
     readLock();
     try {
+      checkOperation(OperationCategory.READ);
+
       if (isPermissionEnabled) {
         checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
       }
@@ -2148,6 +2593,7 @@ public class FSNamesystem implements Nam
       throws IOException, UnresolvedLinkException {
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set quota on " + path, safeMode);
       }
@@ -2172,6 +2618,7 @@ public class FSNamesystem implements Nam
                                   + src + " for " + clientName);
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot fsync file " + src, safeMode);
       }
@@ -2381,6 +2828,10 @@ public class FSNamesystem implements Nam
     String src = "";
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+      // If a DN tries to commit to the standby, the recovery will
+      // fail, and the next retry will succeed on the new NN.
+  
       if (isInSafeMode()) {
         throw new SafeModeException(
           "Cannot commitBlockSynchronization while in safe mode",
@@ -2455,8 +2906,8 @@ public class FSNamesystem implements Nam
         //remove lease, close file
         finalizeINodeFileUnderConstruction(src, pendingFile);
       } else if (supportAppends) {
-        // If this commit does not want to close the file, persist
-        // blocks only if append is supported 
+        // If this commit does not want to close the file, persist blocks
+        // only if append is supported or we're explicitly told to
         dir.persistBlocks(src, pendingFile);
       }
     } finally {
@@ -2481,6 +2932,8 @@ public class FSNamesystem implements Nam
   void renewLease(String holder) throws IOException {
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
       }
@@ -2508,6 +2961,8 @@ public class FSNamesystem implements Nam
     DirectoryListing dl;
     readLock();
     try {
+      checkOperation(OperationCategory.READ);
+
       if (isPermissionEnabled) {
         if (dir.isDir(src)) {
           checkPathAccess(src, FsAction.READ_EXECUTE);
@@ -2586,7 +3041,7 @@ public class FSNamesystem implements Nam
    * @return an array of datanode commands 
    * @throws IOException
    */
-  DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
+  HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       int xceiverCount, int xmitsInProgress, int failedVolumes) 
         throws IOException {
@@ -2597,28 +3052,40 @@ public class FSNamesystem implements Nam
       DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
           nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
           xceiverCount, maxTransfer, failedVolumes);
-      if (cmds != null) {
-        return cmds;
-      }
-
-      //check distributed upgrade
-      DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
-      if (cmd != null) {
-        return new DatanodeCommand[] {cmd};
+      if (cmds == null || cmds.length == 0) {
+        DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
+        if (cmd != null) {
+          cmds = new DatanodeCommand[] {cmd};
+        }
       }
-      return null;
+      
+      return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
     } finally {
       readUnlock();
     }
   }
 
+  private NNHAStatusHeartbeat createHaStatusHeartbeat() {
+    HAState state = haContext.getState();
+    NNHAStatusHeartbeat.State hbState;
+    if (state instanceof ActiveState) {
+      hbState = NNHAStatusHeartbeat.State.ACTIVE;
+    } else if (state instanceof StandbyState) {
+      hbState = NNHAStatusHeartbeat.State.STANDBY;      
+    } else {
+      throw new AssertionError("Invalid state: " + state.getClass());
+    }
+    return new NNHAStatusHeartbeat(hbState,
+        getFSImage().getLastAppliedOrWrittenTxId());
+  }
+
   /**
    * Returns whether or not there were available resources at the last check of
    * resources.
    *
    * @return true if there were sufficient resources available, false otherwise.
    */
-  private boolean nameNodeHasResourcesAvailable() {
+  boolean nameNodeHasResourcesAvailable() {
     return hasResourcesAvailable;
   }
 
@@ -2626,7 +3093,7 @@ public class FSNamesystem implements Nam
    * Perform resource checks and cache the results.
    * @throws IOException
    */
-  private void checkAvailableResources() throws IOException {
+  void checkAvailableResources() {
     Preconditions.checkState(nnResourceChecker != null,
         "nnResourceChecker not initialized");
     hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
@@ -2665,11 +3132,11 @@ public class FSNamesystem implements Nam
     }
   }
   
-  FSImage getFSImage() {
+  public FSImage getFSImage() {
     return dir.fsImage;
   }
 
-  FSEditLog getEditLog() {
+  public FSEditLog getEditLog() {
     return getFSImage().getEditLog();
   }    
 
@@ -2701,8 +3168,12 @@ public class FSNamesystem implements Nam
   @Metric({"TransactionsSinceLastLogRoll",
       "Number of transactions since last edit log roll"})
   public long getTransactionsSinceLastLogRoll() {
-    return (getEditLog().getLastWrittenTxId() -
-        getEditLog().getCurSegmentTxId()) + 1;
+    if (isInStandbyState()) {
+      return 0;
+    } else {
+      return getEditLog().getLastWrittenTxId() -
+        getEditLog().getCurSegmentTxId() + 1;
+    }
   }
   
   @Metric({"LastWrittenTransactionId", "Transaction ID written to the edit log"})
@@ -2931,6 +3402,8 @@ public class FSNamesystem implements Nam
     boolean initializedReplQueues = false;
     /** Was safemode entered automatically because available resources were low. */
     private boolean resourcesLow = false;
+    /** Should safemode adjust its block totals as blocks come in */
+    private boolean shouldIncrementallyTrackBlocks = false;
     
     /**
      * Creates SafeModeInfo when the name node enters
@@ -2959,6 +3432,18 @@ public class FSNamesystem implements Nam
     }
 
     /**
+     * In the HA case, the StandbyNode can be in safemode while the namespace
+     * is modified by the edit log tailer. In this case, the number of total
+     * blocks changes as edits are processed (eg blocks are added and deleted).
+     * However, we don't want to do the incremental tracking during the
+     * startup-time loading process -- only once the initial total has been
+     * set after the image has been loaded.
+     */
+    private boolean shouldIncrementallyTrackBlocks() {
+      return shouldIncrementallyTrackBlocks;
+    }
+
+    /**
      * Creates SafeModeInfo when safe mode is entered manually, or because
      * available resources are low.
      *
@@ -2986,13 +3471,7 @@ public class FSNamesystem implements Nam
      * @return true if in safe mode
      */
     private synchronized boolean isOn() {
-      try {
-        assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
-          + "Total num of blocks, active blocks, or "
-          + "total safe blocks don't match.";
-      } catch(IOException e) {
-        System.err.print(StringUtils.stringifyException(e));
-      }
+      doConsistencyCheck();
       return this.reached >= 0;
     }
       
@@ -3031,8 +3510,9 @@ public class FSNamesystem implements Nam
           return;
         }
       }
-      // if not done yet, initialize replication queues
-      if (!isPopulatingReplQueues()) {
+      // if not done yet, initialize replication queues.
+      // In the standby, do not populate repl queues
+      if (!isPopulatingReplQueues() && !isInStandbyState()) {
         initializeReplQueues();
       }
       long timeInSafemode = now() - systemStart;
@@ -3051,6 +3531,8 @@ public class FSNamesystem implements Nam
           + nt.getNumOfLeaves() + " datanodes");
       NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
           + blockManager.numOfUnderReplicatedBlocks() + " blocks");
+
+      startSecretManagerIfNecessary();
     }
 
     /**
@@ -3073,7 +3555,7 @@ public class FSNamesystem implements Nam
      * initializing replication queues.
      */
     private synchronized boolean canInitializeReplQueues() {
-      return blockSafe >= blockReplQueueThreshold;
+      return !isInStandbyState() && blockSafe >= blockReplQueueThreshold;
     }
       
     /** 
@@ -3106,6 +3588,9 @@ public class FSNamesystem implements Nam
      * Check and trigger safe mode if needed. 
      */
     private void checkMode() {
+      // Have to have write-lock since leaving safemode initializes
+      // repl queues, which requires write lock
+      assert hasWriteLock();
       if (needEnter()) {
         enter();
         // check if we are ready to initialize replication queues
@@ -3145,6 +3630,13 @@ public class FSNamesystem implements Nam
       this.blockThreshold = (int) (blockTotal * threshold);
       this.blockReplQueueThreshold = 
         (int) (blockTotal * replQueueThreshold);
+      if (haEnabled) {
+        // After we initialize the block count, any further namespace
+        // modifications done while in safe mode need to keep track
+        // of the number of total blocks in the system.
+        this.shouldIncrementallyTrackBlocks = true;
+      }
+      
       checkMode();
     }
       
@@ -3154,9 +3646,10 @@ public class FSNamesystem implements Nam
      * @param replication current replication 
      */
     private synchronized void incrementSafeBlockCount(short replication) {
-      if (replication == safeReplication)
+      if (replication == safeReplication) {
         this.blockSafe++;
-      checkMode();
+        checkMode();
+      }
     }
       
     /**
@@ -3165,9 +3658,11 @@ public class FSNamesystem implements Nam
      * @param replication current replication 
      */
     private synchronized void decrementSafeBlockCount(short replication) {
-      if (replication == safeReplication-1)
+      if (replication == safeReplication-1) {
         this.blockSafe--;
-      checkMode();
+        assert blockSafe >= 0 || isManual();
+        checkMode();
+      }
     }
 
     /**
@@ -3285,16 +3780,45 @@ public class FSNamesystem implements Nam
       
     /**
      * Checks consistency of the class state.
-     * This is costly and currently called only in assert.
-     * @throws IOException 
+     * This is costly so only runs if asserts are enabled.
      */
-    private boolean isConsistent() throws IOException {
+    private void doConsistencyCheck() {
+      boolean assertsOn = false;
+      assert assertsOn = true; // set to true if asserts are on
+      if (!assertsOn) return;
+      
       if (blockTotal == -1 && blockSafe == -1) {
-        return true; // manual safe mode
+        return; // manual safe mode
       }
       int activeBlocks = blockManager.getActiveBlockCount();
-      return (blockTotal == activeBlocks) ||
-        (blockSafe >= 0 && blockSafe <= blockTotal);
+      if ((blockTotal != activeBlocks) &&
+          !(blockSafe >= 0 && blockSafe <= blockTotal)) {
+        throw new AssertionError(
+            " SafeMode: Inconsistent filesystem state: "
+        + "SafeMode data: blockTotal=" + blockTotal
+        + " blockSafe=" + blockSafe + "; "
+        + "BlockManager data: active="  + activeBlocks);
+      }
+    }
+
+    private synchronized void adjustBlockTotals(int deltaSafe, int deltaTotal) {
+      if (!shouldIncrementallyTrackBlocks) {
+        return;
+      }
+      assert haEnabled;
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adjusting block totals from " +
+            blockSafe + "/" + blockTotal + " to " +
+            (blockSafe + deltaSafe) + "/" + (blockTotal + deltaTotal));
+      }
+      assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " +
+        blockSafe + " by " + deltaSafe + ": would be negative";
+      assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " +
+        blockTotal + " by " + deltaTotal + ": would be negative";
+      
+      blockSafe += deltaSafe;
+      setBlockTotal(blockTotal + deltaTotal);
     }
   }
     
@@ -3376,6 +3900,9 @@ public class FSNamesystem implements Nam
 
   @Override
   public boolean isPopulatingReplQueues() {
+    if (isInStandbyState()) {
+      return false;
+    }
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
@@ -3398,13 +3925,30 @@ public class FSNamesystem implements Nam
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null) // mostly true
       return;
-    safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
+    BlockInfo storedBlock = blockManager.getStoredBlock(b);
+    if (storedBlock.isComplete()) {
+      safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
+    }
+  }
+  
+  /**
+   * Adjust the total number of blocks safe and expected during safe mode.
+   * If safe mode is not currently on, this is a no-op.
+   * @param deltaSafe the change in number of safe blocks
+   * @param deltaTotal the change i nnumber of total blocks expected
+   */
+  public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
+    if (safeMode == null)
+      return;
+    safeMode.adjustBlockTotals(deltaSafe, deltaTotal);
   }
 
   /**
    * Set the total number of blocks in the system. 
    */
-  void setBlockTotal() {
+  public void setBlockTotal() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
@@ -3440,7 +3984,8 @@ public class FSNamesystem implements Nam
           }
           assert node != null : "Found a lease for nonexisting file.";
           assert node.isUnderConstruction() :
-            "Found a lease for file that is not under construction.";
+            "Found a lease for file " + path + " that is not under construction." +
+            " lease=" + lease;
           INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
           BlockInfo[] blocks = cons.getBlocks();
           if(blocks == null)
@@ -3465,21 +4010,32 @@ public class FSNamesystem implements Nam
   void enterSafeMode(boolean resourcesLow) throws IOException {
     writeLock();
     try {
-    // Ensure that any concurrent operations have been fully synced
-    // before entering safe mode. This ensures that the FSImage
-    // is entirely stable on disk as soon as we're in safe mode.
-    getEditLog().logSyncAll();
-    if (!isInSafeMode()) {
-      safeMode = new SafeModeInfo(resourcesLow);
-      return;
-    }
-    if (resourcesLow) {
-      safeMode.setResourcesLow();
-    }
-    safeMode.setManual();
-    getEditLog().logSyncAll();
-    NameNode.stateChangeLog.info("STATE* Safe mode is ON. " 
-                                + safeMode.getTurnOffTip());
+      // Stop the secret manager, since rolling the master key would
+      // try to write to the edit log
+      stopSecretManager();
+
+      // Ensure that any concurrent operations have been fully synced
+      // before entering safe mode. This ensures that the FSImage
+      // is entirely stable on disk as soon as we're in safe mode.
+      boolean isEditlogOpenForWrite = getEditLog().isOpenForWrite();
+      // Before Editlog is in OpenForWrite mode, editLogStream will be null. So,
+      // logSyncAll call can be called only when Edlitlog is in OpenForWrite mode
+      if (isEditlogOpenForWrite) {
+        getEditLog().logSyncAll();
+      }
+      if (!isInSafeMode()) {
+        safeMode = new SafeModeInfo(resourcesLow);
+        return;
+      }
+      if (resourcesLow) {
+        safeMode.setResourcesLow();
+      }
+      safeMode.setManual();
+      if (isEditlogOpenForWrite) {
+        getEditLog().logSyncAll();
+      }
+      NameNode.stateChangeLog.info("STATE* Safe mode is ON. "
+          + safeMode.getTurnOffTip());
     } finally {
       writeUnlock();
     }
@@ -3520,6 +4076,7 @@ public class FSNamesystem implements Nam
   CheckpointSignature rollEditLog() throws IOException {
     writeLock();
     try {
+      checkOperation(OperationCategory.JOURNAL);
       if (isInSafeMode()) {
         throw new SafeModeException("Log not rolled", safeMode);
       }
@@ -3536,6 +4093,8 @@ public class FSNamesystem implements Nam
   throws IOException {
     writeLock();
     try {
+      checkOperation(OperationCategory.CHECKPOINT);
+
       if (isInSafeMode()) {
         throw new SafeModeException("Checkpoint not started", safeMode);
       }
@@ -3552,6 +4111,8 @@ public class FSNamesystem implements Nam
                             CheckpointSignature sig) throws IOException {
     readLock();
     try {
+      checkOperation(OperationCategory.CHECKPOINT);
+
       if (isInSafeMode()) {
         throw new SafeModeException("Checkpoint not ended", safeMode);
       }
@@ -3704,6 +4265,34 @@ public class FSNamesystem implements Nam
     return blockManager.getExcessBlocksCount();
   }
   
+  // HA-only metric
+  @Metric
+  public long getPostponedMisreplicatedBlocks() {
+    return blockManager.getPostponedMisreplicatedBlocksCount();
+  }
+
+  // HA-only metric
+  @Metric
+  public int getPendingDataNodeMessageCount() {
+    return blockManager.getPendingDataNodeMessageCount();
+  }
+  
+  // HA-only metric
+  @Metric
+  public String getHAState() {
+    return haContext.getState().toString();
+  }
+
+  // HA-only metric
+  @Metric
+  public long getMillisSinceLastLoadedEdits() {
+    if (isInStandbyState() && editLogTailer != null) {
+      return now() - editLogTailer.getLastLoadTimestamp();
+    } else {
+      return 0;
+    }
+  }
+  
   @Metric
   public int getBlockCapacity() {
     return blockManager.getCapacity();
@@ -3715,6 +4304,7 @@ public class FSNamesystem implements Nam
   }
   
   private ObjectName mbeanName;
+
   /**
    * Register the FSNamesystem MBean using the name
    *        "hadoop:service=NameNode,name=FSNamesystemState"
@@ -3813,6 +4403,29 @@ public class FSNamesystem implements Nam
   }
   
   /**
+   * Client is reporting some bad block locations.
+   */
+  void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      
+      NameNode.stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
+      for (int i = 0; i < blocks.length; i++) {
+        ExtendedBlock blk = blocks[i].getBlock();
+        DatanodeInfo[] nodes = blocks[i].getLocations();
+        for (int j = 0; j < nodes.length; j++) {
+          DatanodeInfo dn = nodes[j];
+          blockManager.findAndMarkBlockAsCorrupt(blk, dn,
+              "client machine reported it");
+        }
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /**
    * Get a new generation stamp together with an access token for 
    * a block under construction
    * 
@@ -3829,6 +4442,8 @@ public class FSNamesystem implements Nam
     LocatedBlock locatedBlock;
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       // check vadility of parameters
       checkUCBlock(block, clientName);
   
@@ -3858,6 +4473,8 @@ public class FSNamesystem implements Nam
       throws IOException {
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       if (isInSafeMode()) {
         throw new SafeModeException("Pipeline not updated", safeMode);
       }
@@ -3873,7 +4490,7 @@ public class FSNamesystem implements Nam
     } finally {
       writeUnlock();
     }
-    if (supportAppends) {
+    if (supportAppends || persistBlocks) {
       getEditLog().logSync();
     }
     LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
@@ -4067,6 +4684,8 @@ public class FSNamesystem implements Nam
 
     readLock();
     try {
+      checkOperation(OperationCategory.READ);
+
       if (!isPopulatingReplQueues()) {
         throw new IOException("Cannot run listCorruptFileBlocks because " +
                               "replication queues have not been initialized.");
@@ -4159,6 +4778,8 @@ public class FSNamesystem implements Nam
     Token<DelegationTokenIdentifier> token;
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot issue delegation token", safeMode);
       }
@@ -4203,6 +4824,8 @@ public class FSNamesystem implements Nam
     long expiryTime;
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot renew delegation token", safeMode);
       }
@@ -4233,6 +4856,8 @@ public class FSNamesystem implements Nam
       throws IOException {
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
+
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot cancel delegation token", safeMode);
       }
@@ -4266,16 +4891,14 @@ public class FSNamesystem implements Nam
    * @param key new delegation key.
    */
   public void logUpdateMasterKey(DelegationKey key) throws IOException {
-    writeLock();
-    try {
-      if (isInSafeMode()) {
-        throw new SafeModeException(
-          "Cannot log master key update in safe mode", safeMode);
-      }
-      getEditLog().logUpdateMasterKey(key);
-    } finally {
-      writeUnlock();
-    }
+    
+    assert !isInSafeMode() :
+      "this should never be called while in safemode, since we stop " +
+      "the DT manager before entering safemode!";
+    // No need to hold FSN lock since we don't access any internal
+    // structures, and this is stopped before the FSN shuts itself
+    // down, etc.
+    getEditLog().logUpdateMasterKey(key);
     getEditLog().logSync();
   }
   
@@ -4545,9 +5168,32 @@ public class FSNamesystem implements Nam
       byte[] password) throws InvalidToken {
     getDelegationTokenSecretManager().verifyToken(identifier, password);
   }
+  
+  public boolean isGenStampInFuture(long genStamp) {
+    return (genStamp > getGenerationStamp());
+  }
+  @VisibleForTesting
+  public EditLogTailer getEditLogTailer() {
+    return editLogTailer;
+  }
+  
+  @VisibleForTesting
+  void setFsLockForTests(ReentrantReadWriteLock lock) {
+    this.fsLock = lock;
+  }
+  
+  @VisibleForTesting
+  ReentrantReadWriteLock getFsLockForTests() {
+    return fsLock;
+  }
 
   @VisibleForTesting
   public SafeModeInfo getSafeModeInfoForTests() {
     return safeMode;
   }
+  
+  @VisibleForTesting
+  public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
+    this.nnResourceChecker = nnResourceChecker;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Sat Mar  3 00:42:49 2012
@@ -52,6 +52,7 @@ class FileJournalManager implements Jour
   private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
 
   private final StorageDirectory sd;
+  private final NNStorage storage;
   private int outputBufferCapacity = 512*1024;
 
   private static final Pattern EDITS_REGEX = Pattern.compile(
@@ -60,14 +61,14 @@ class FileJournalManager implements Jour
     NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
 
   private File currentInProgress = null;
-  private long maxSeenTransaction = 0L;
 
   @VisibleForTesting
   StoragePurger purger
     = new NNStorageRetentionManager.DeletionStoragePurger();
 
-  public FileJournalManager(StorageDirectory sd) {
+  public FileJournalManager(StorageDirectory sd, NNStorage storage) {
     this.sd = sd;
+    this.storage = storage;
   }
 
   @Override 
@@ -76,11 +77,16 @@ class FileJournalManager implements Jour
   @Override
   synchronized public EditLogOutputStream startLogSegment(long txid) 
       throws IOException {
-    currentInProgress = NNStorage.getInProgressEditsFile(sd, txid);
-    EditLogOutputStream stm = new EditLogFileOutputStream(currentInProgress,
-        outputBufferCapacity);
-    stm.create();
-    return stm;
+    try {
+      currentInProgress = NNStorage.getInProgressEditsFile(sd, txid);
+      EditLogOutputStream stm = new EditLogFileOutputStream(currentInProgress,
+          outputBufferCapacity);
+      stm.create();
+      return stm;
+    } catch (IOException e) {
+      storage.reportErrorsOnDirectory(sd);
+      throw e;
+    }
   }
 
   @Override
@@ -90,13 +96,14 @@ class FileJournalManager implements Jour
 
     File dstFile = NNStorage.getFinalizedEditsFile(
         sd, firstTxId, lastTxId);
-    LOG.debug("Finalizing edits file " + inprogressFile + " -> " + dstFile);
+    LOG.info("Finalizing edits file " + inprogressFile + " -> " + dstFile);
     
     Preconditions.checkState(!dstFile.exists(),
         "Can't finalize edits file " + inprogressFile + " since finalized file " +
         "already exists");
     if (!inprogressFile.renameTo(dstFile)) {
-      throw new IOException("Unable to finalize edits file " + inprogressFile);
+      storage.reportErrorsOnDirectory(sd);
+      throw new IllegalStateException("Unable to finalize edits file " + inprogressFile);
     }
     if (inprogressFile.equals(currentInProgress)) {
       currentInProgress = null;
@@ -116,6 +123,7 @@ class FileJournalManager implements Jour
   @Override
   public void purgeLogsOlderThan(long minTxIdToKeep)
       throws IOException {
+    LOG.info("Purging logs older than " + minTxIdToKeep);
     File[] files = FileUtil.listFiles(sd.getCurrentDir());
     List<EditLogFile> editLogs = 
       FileJournalManager.matchEditLogs(files);
@@ -135,18 +143,18 @@ class FileJournalManager implements Jour
    */
   List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
     File currentDir = sd.getCurrentDir();
-    List<EditLogFile> allLogFiles = matchEditLogs(
-        FileUtil.listFiles(currentDir));
+    List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
     List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
         allLogFiles.size());
 
     for (EditLogFile elf : allLogFiles) {
-      if (elf.isCorrupt() || elf.isInProgress()) continue;
+      if (elf.hasCorruptHeader() || elf.isInProgress()) continue;
       if (elf.getFirstTxId() >= firstTxId) {
         ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
       } else if ((firstTxId > elf.getFirstTxId()) &&
                  (firstTxId <= elf.getLastTxId())) {
-        throw new IOException("Asked for firstTxId " + firstTxId
+        // Note that this behavior is different from getLogFiles below.
+        throw new IllegalStateException("Asked for firstTxId " + firstTxId
             + " which is in the middle of file " + elf.file);
       }
     }
@@ -154,6 +162,20 @@ class FileJournalManager implements Jour
     return ret;
   }
 
+  /**
+   * returns matching edit logs via the log directory. Simple helper function
+   * that lists the files in the logDir and calls matchEditLogs(File[])
+   * 
+   * @param logDir
+   *          directory to match edit logs in
+   * @return matched edit logs
+   * @throws IOException
+   *           IOException thrown for invalid logDir
+   */
+  static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
+    return matchEditLogs(FileUtil.listFiles(logDir));
+  }
+  
   static List<EditLogFile> matchEditLogs(File[] filesInStorage) {
     List<EditLogFile> ret = Lists.newArrayList();
     for (File f : filesInStorage) {
@@ -169,7 +191,7 @@ class FileJournalManager implements Jour
           LOG.error("Edits file " + f + " has improperly formatted " +
                     "transaction ID");
           // skip
-        }          
+        }
       }
       
       // Check for in-progress edits
@@ -190,27 +212,37 @@ class FileJournalManager implements Jour
   }
 
   @Override
-  synchronized public EditLogInputStream getInputStream(long fromTxId) 
-      throws IOException {
+  synchronized public EditLogInputStream getInputStream(long fromTxId,
+      boolean inProgressOk) throws IOException {
     for (EditLogFile elf : getLogFiles(fromTxId)) {
-      if (elf.getFirstTxId() == fromTxId) {
+      if (elf.containsTxId(fromTxId)) {
+        if (!inProgressOk && elf.isInProgress()) {
+          continue;
+        }
         if (elf.isInProgress()) {
           elf.validateLog();
         }
         if (LOG.isTraceEnabled()) {
           LOG.trace("Returning edit stream reading from " + elf);
         }
-        return new EditLogFileInputStream(elf.getFile(), 
-            elf.getFirstTxId(), elf.getLastTxId());
+        EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(),
+            elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());
+        long transactionsToSkip = fromTxId - elf.getFirstTxId();
+        if (transactionsToSkip > 0) {
+          LOG.info(String.format("Log begins at txid %d, but requested start "
+              + "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId,
+              transactionsToSkip));
+          elfis.skipTransactions(transactionsToSkip);
+        }
+        return elfis;
       }
     }
 
-    throw new IOException("Cannot find editlog file with " + fromTxId
-        + " as first first txid");
+    throw new IOException("Cannot find editlog file containing " + fromTxId);
   }
 
   @Override
-  public long getNumberOfTransactions(long fromTxId) 
+  public long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
       throws IOException, CorruptionException {
     long numTxns = 0L;
     
@@ -222,21 +254,25 @@ class FileJournalManager implements Jour
         LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is "
             + fromTxId + " - " + (elf.getFirstTxId() - 1));
         break;
-      } else if (fromTxId == elf.getFirstTxId()) {
+      } else if (elf.containsTxId(fromTxId)) {
+        if (!inProgressOk && elf.isInProgress()) {
+          break;
+        }
+        
         if (elf.isInProgress()) {
           elf.validateLog();
         } 
 
-        if (elf.isCorrupt()) {
+        if (elf.hasCorruptHeader()) {
           break;
         }
+        numTxns += elf.getLastTxId() + 1 - fromTxId;
         fromTxId = elf.getLastTxId() + 1;
-        numTxns += fromTxId - elf.getFirstTxId();
         
         if (elf.isInProgress()) {
           break;
         }
-      } // else skip
+      }
     }
 
     if (LOG.isDebugEnabled()) {
@@ -244,7 +280,8 @@ class FileJournalManager implements Jour
                 + " txns from " + fromTxId);
     }
 
-    long max = findMaxTransaction();
+    long max = findMaxTransaction(inProgressOk);
+    
     // fromTxId should be greater than max, as it points to the next 
     // transaction we should expect to find. If it is less than or equal
     // to max, it means that a transaction with txid == max has not been found
@@ -261,23 +298,44 @@ class FileJournalManager implements Jour
   @Override
   synchronized public void recoverUnfinalizedSegments() throws IOException {
     File currentDir = sd.getCurrentDir();
-    List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
-    
-    // make sure journal is aware of max seen transaction before moving corrupt 
-    // files aside
-    findMaxTransaction();
+    LOG.info("Recovering unfinalized segments in " + currentDir);
+    List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
 
     for (EditLogFile elf : allLogFiles) {
       if (elf.getFile().equals(currentInProgress)) {
         continue;
       }
       if (elf.isInProgress()) {
+        // If the file is zero-length, we likely just crashed after opening the
+        // file, but before writing anything to it. Safe to delete it.
+        if (elf.getFile().length() == 0) {
+          LOG.info("Deleting zero-length edit log file " + elf);
+          if (!elf.getFile().delete()) {
+            throw new IOException("Unable to delete file " + elf.getFile());
+          }
+          continue;
+        }
+        
         elf.validateLog();
 
-        if (elf.isCorrupt()) {
+        if (elf.hasCorruptHeader()) {
           elf.moveAsideCorruptFile();
+          throw new CorruptionException("In-progress edit log file is corrupt: "
+              + elf);
+        }
+        
+        // If the file has a valid header (isn't corrupt) but contains no
+        // transactions, we likely just crashed after opening the file and
+        // writing the header, but before syncing any transactions. Safe to
+        // delete the file.
+        if (elf.getNumTransactions() == 0) {
+          LOG.info("Deleting edit log file with zero transactions " + elf);
+          if (!elf.getFile().delete()) {
+            throw new IOException("Unable to delete " + elf.getFile());
+          }
           continue;
         }
+        
         finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId());
       }
     }
@@ -285,16 +343,12 @@ class FileJournalManager implements Jour
 
   private List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
     File currentDir = sd.getCurrentDir();
-    List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
+    List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
     List<EditLogFile> logFiles = Lists.newArrayList();
     
     for (EditLogFile elf : allLogFiles) {
-      if (fromTxId > elf.getFirstTxId()
-          && fromTxId <= elf.getLastTxId()) {
-        throw new IOException("Asked for fromTxId " + fromTxId
-            + " which is in middle of file " + elf.file);
-      }
-      if (fromTxId <= elf.getFirstTxId()) {
+      if (fromTxId <= elf.getFirstTxId() ||
+          elf.containsTxId(fromTxId)) {
         logFiles.add(elf);
       }
     }
@@ -306,21 +360,35 @@ class FileJournalManager implements Jour
 
   /** 
    * Find the maximum transaction in the journal.
-   * This gets stored in a member variable, as corrupt edit logs
-   * will be moved aside, but we still need to remember their first
-   * tranaction id in the case that it was the maximum transaction in
-   * the journal.
    */
-  private long findMaxTransaction()
+  private long findMaxTransaction(boolean inProgressOk)
       throws IOException {
+    boolean considerSeenTxId = true;
+    long seenTxId = NNStorage.readTransactionIdFile(sd);
+    long maxSeenTransaction = 0;
     for (EditLogFile elf : getLogFiles(0)) {
+      if (elf.isInProgress() && !inProgressOk) {
+        if (elf.getFirstTxId() != HdfsConstants.INVALID_TXID &&
+            elf.getFirstTxId() <= seenTxId) {
+          // don't look at the seen_txid file if in-progress logs are not to be
+          // examined, and the value in seen_txid falls within the in-progress
+          // segment.
+          considerSeenTxId = false;
+        }
+        continue;
+      }
+      
       if (elf.isInProgress()) {
         maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction);
         elf.validateLog();
       }
       maxSeenTransaction = Math.max(elf.getLastTxId(), maxSeenTransaction);
     }
-    return maxSeenTransaction;
+    if (considerSeenTxId) {
+      return Math.max(maxSeenTransaction, seenTxId);
+    } else {
+      return maxSeenTransaction;
+    }
   }
 
   @Override
@@ -335,8 +403,9 @@ class FileJournalManager implements Jour
     private File file;
     private final long firstTxId;
     private long lastTxId;
+    private long numTx = -1;
 
-    private boolean isCorrupt = false;
+    private boolean hasCorruptHeader = false;
     private final boolean isInProgress;
 
     final static Comparator<EditLogFile> COMPARE_BY_START_TXID 
@@ -376,6 +445,10 @@ class FileJournalManager implements Jour
     long getLastTxId() {
       return lastTxId;
     }
+    
+    boolean containsTxId(long txId) {
+      return firstTxId <= txId && txId <= lastTxId;
+    }
 
     /** 
      * Count the number of valid transactions in a log.
@@ -384,11 +457,13 @@ class FileJournalManager implements Jour
      */
     void validateLog() throws IOException {
       EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
-      if (val.getNumTransactions() == 0) {
-        markCorrupt();
-      } else {
-        this.lastTxId = val.getEndTxId();
-      }
+      this.numTx = val.getNumTransactions();
+      this.lastTxId = val.getEndTxId();
+      this.hasCorruptHeader = val.hasCorruptHeader();
+    }
+    
+    long getNumTransactions() {
+      return numTx;
     }
 
     boolean isInProgress() {
@@ -399,16 +474,12 @@ class FileJournalManager implements Jour
       return file;
     }
     
-    void markCorrupt() {
-      isCorrupt = true;
-    }
-    
-    boolean isCorrupt() {
-      return isCorrupt;
+    boolean hasCorruptHeader() {
+      return hasCorruptHeader;
     }
 
     void moveAsideCorruptFile() throws IOException {
-      assert isCorrupt;
+      assert hasCorruptHeader;
     
       File src = file;
       File dst = new File(src.getParent(), src.getName() + ".corrupt");
@@ -423,8 +494,9 @@ class FileJournalManager implements Jour
     @Override
     public String toString() {
       return String.format("EditLogFile(file=%s,first=%019d,last=%019d,"
-                           +"inProgress=%b,corrupt=%b)", file.toString(),
-                           firstTxId, lastTxId, isInProgress(), isCorrupt);
+                           +"inProgress=%b,hasCorruptHeader=%b,numTx=%d)",
+                           file.toString(), firstTxId, lastTxId,
+                           isInProgress(), hasCorruptHeader, numTx);
     }
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Sat Mar  3 00:42:49 2012
@@ -124,16 +124,18 @@ public class GetImageServlet extends Htt
             final long txid = parsedParams.getTxId();
 
             if (! currentlyDownloadingCheckpoints.add(txid)) {
-              throw new IOException(
+              response.sendError(HttpServletResponse.SC_CONFLICT,
                   "Another checkpointer is already in the process of uploading a" +
                   " checkpoint made at transaction ID " + txid);
+              return null;
             }
 
             try {
               if (nnImage.getStorage().findImageFile(txid) != null) {
-                throw new IOException(
+                response.sendError(HttpServletResponse.SC_CONFLICT,
                     "Another checkpointer already uploaded an checkpoint " +
                     "for txid " + txid);
+                return null;
               }
               
               // issue a HTTP get request to download the new fsimage 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Sat Mar  3 00:42:49 2012
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 
+import com.google.common.base.Joiner;
+
 /**
  * I-node for file being written.
  */
@@ -41,19 +43,7 @@ public class INodeFileUnderConstruction 
                              String clientName,
                              String clientMachine,
                              DatanodeDescriptor clientNode) {
-    this(permissions, 0, replication, preferredBlockSize, modTime,
-        clientName, clientMachine, clientNode);
-  }
-
-  INodeFileUnderConstruction(PermissionStatus permissions,
-                             int nrBlocks,
-                             short replication,
-                             long preferredBlockSize,
-                             long modTime,
-                             String clientName,
-                             String clientMachine,
-                             DatanodeDescriptor clientNode) {
-    super(permissions.applyUMask(UMASK), nrBlocks, replication,
+    super(permissions.applyUMask(UMASK), 0, replication,
         modTime, modTime, preferredBlockSize);
     this.clientName = clientName;
     this.clientMachine = clientMachine;
@@ -106,6 +96,9 @@ public class INodeFileUnderConstruction 
   // use the modification time as the access time
   //
   INodeFile convertToInodeFile() {
+    assert allBlocksComplete() :
+      "Can't finalize inode " + this + " since it contains " +
+      "non-complete blocks! Blocks are: " + blocksAsString();
     INodeFile obj = new INodeFile(getPermissionStatus(),
                                   getBlocks(),
                                   getReplication(),
@@ -115,6 +108,18 @@ public class INodeFileUnderConstruction 
     return obj;
     
   }
+  
+  /**
+   * @return true if all of the blocks in this file are marked as completed.
+   */
+  private boolean allBlocksComplete() {
+    for (BlockInfo b : blocks) {
+      if (!b.isComplete()) {
+        return false;
+      }
+    }
+    return true;
+  }
 
   /**
    * Remove a block from the block list. This block should be
@@ -153,4 +158,8 @@ public class INodeFileUnderConstruction 
     setBlock(numBlocks()-1, ucBlock);
     return ucBlock;
   }
+  
+  private String blocksAsString() {
+    return Joiner.on(",").join(this.blocks);
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Sat Mar  3 00:42:49 2012
@@ -48,20 +48,23 @@ public interface JournalManager extends 
    /**
    * Get the input stream starting with fromTxnId from this journal manager
    * @param fromTxnId the first transaction id we want to read
+   * @param inProgressOk whether or not in-progress streams should be returned
    * @return the stream starting with transaction fromTxnId
    * @throws IOException if a stream cannot be found.
    */
-  EditLogInputStream getInputStream(long fromTxnId) throws IOException;
+  EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
+    throws IOException;
 
   /**
    * Get the number of transaction contiguously available from fromTxnId.
    *
    * @param fromTxnId Transaction id to count from
+   * @param inProgressOk whether or not in-progress streams should be counted
    * @return The number of transactions available from fromTxnId
    * @throws IOException if the journal cannot be read.
    * @throws CorruptionException if there is a gap in the journal at fromTxnId.
    */
-  long getNumberOfTransactions(long fromTxnId) 
+  long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
       throws IOException, CorruptionException;
 
   /**



Mime
View raw message