hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject [2/2] hadoop git commit: HDFS-9129. Move the safemode block count into BlockManager. Contributed by Mingliang Liu.
Date Wed, 02 Dec 2015 00:12:19 GMT
HDFS-9129. Move the safemode block count into BlockManager. Contributed by Mingliang Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a49cc74b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a49cc74b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a49cc74b

Branch: refs/heads/trunk
Commit: a49cc74b4c72195dee1dfb6f9548e5e411dff553
Parents: 58f6f54
Author: Jing Zhao <jing9@apache.org>
Authored: Tue Dec 1 16:09:19 2015 -0800
Committer: Jing Zhao <jing9@apache.org>
Committed: Tue Dec 1 16:09:19 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/blockmanagement/BlockManager.java    |  95 ++-
 .../blockmanagement/BlockManagerSafeMode.java   | 573 +++++++++++++
 .../server/blockmanagement/DatanodeManager.java |   6 +-
 .../hdfs/server/namenode/Checkpointer.java      |   3 +-
 .../hdfs/server/namenode/FSDirDeleteOp.java     |   2 +-
 .../hdfs/server/namenode/FSDirRenameOp.java     |   3 +-
 .../hdfs/server/namenode/FSDirTruncateOp.java   |   2 +-
 .../hdfs/server/namenode/FSEditLogLoader.java   |   3 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 855 ++-----------------
 .../hadoop/hdfs/server/namenode/NameNode.java   |  10 +-
 .../hadoop/hdfs/server/namenode/Namesystem.java |  15 +-
 .../hadoop/hdfs/server/namenode/SafeMode.java   |  18 -
 .../org/apache/hadoop/hdfs/TestSafeMode.java    |   2 +-
 .../blockmanagement/BlockManagerTestUtil.java   |   9 +
 .../TestBlockManagerSafeMode.java               | 420 +++++++++
 .../blockmanagement/TestReplicationPolicy.java  |   2 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |  10 +-
 .../hdfs/server/namenode/TestFSNamesystem.java  |   9 +-
 .../TestNameNodeMetadataConsistency.java        |   2 -
 .../hdfs/server/namenode/ha/TestHASafeMode.java |  11 +-
 21 files changed, 1208 insertions(+), 845 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3e1718d..ee6d38f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -879,6 +879,9 @@ Release 2.9.0 - UNRELEASED
 
   IMPROVEMENTS
 
+      HDFS-9129. Move the safemode block count into BlockManager. (Mingliang Liu
+      via jing9)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 3033eaa..8c94c03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -125,6 +126,8 @@ public class BlockManager implements BlockStatsMXBean {
 
   private final Namesystem namesystem;
 
+  private final BlockManagerSafeMode bmSafeMode;
+
   private final DatanodeManager datanodeManager;
   private final HeartbeatManager heartbeatManager;
   private final BlockTokenSecretManager blockTokenSecretManager;
@@ -380,6 +383,8 @@ public class BlockManager implements BlockStatsMXBean {
     this.numberOfBytesInFutureBlocks = new AtomicLong();
     this.inRollBack = isInRollBackMode(NameNode.getStartupOption(conf));
 
+    bmSafeMode = new BlockManagerSafeMode(this, namesystem, conf);
+
     LOG.info("defaultReplication         = " + defaultReplication);
     LOG.info("maxReplication             = " + maxReplication);
     LOG.info("minReplication             = " + minReplication);
@@ -488,15 +493,17 @@ public class BlockManager implements BlockStatsMXBean {
         : false;
   }
 
-  public void activate(Configuration conf) {
+  public void activate(Configuration conf, long blockTotal) {
     pendingReplications.start();
     datanodeManager.activate(conf);
     this.replicationThread.setName("ReplicationMonitor");
     this.replicationThread.start();
     mxBeanName = MBeans.register("NameNode", "BlockStats", this);
+    bmSafeMode.activate(blockTotal);
   }
 
   public void close() {
+    bmSafeMode.close();
     try {
       replicationThread.interrupt();
       replicationThread.join(3000);
@@ -741,11 +748,11 @@ public class BlockManager implements BlockStatsMXBean {
     // count. (We may not have the minimum replica count yet if this is
     // a "forced" completion when a file is getting closed by an
     // OP_CLOSE edit on the standby).
-    namesystem.adjustSafeModeBlockTotals(0, 1);
+    bmSafeMode.adjustBlockTotals(0, 1);
     final int minStorage = curBlock.isStriped() ?
         ((BlockInfoStriped) curBlock).getRealDataBlockNum() : minReplication;
-    namesystem.incrementSafeBlockCount(
-        Math.min(numNodes, minStorage), curBlock);
+    bmSafeMode.incrementSafeBlockCount(Math.min(numNodes, minStorage),
+        curBlock);
   }
 
   /**
@@ -805,7 +812,7 @@ public class BlockManager implements BlockStatsMXBean {
     
     // Adjust safe-mode totals, since under-construction blocks don't
     // count in safe-mode.
-    namesystem.adjustSafeModeBlockTotals(
+    bmSafeMode.adjustBlockTotals(
         // decrement safe if we had enough
         hasMinStorage(lastBlock, targets.length) ? -1 : 0,
         // always decrement total blocks
@@ -1188,7 +1195,7 @@ public class BlockManager implements BlockStatsMXBean {
         invalidateBlocks.remove(node, b);
       }
     }
-    namesystem.checkSafeMode();
+    checkSafeMode();
   }
 
   /**
@@ -1933,6 +1940,74 @@ public class BlockManager implements BlockStatsMXBean {
     return leaseId;
   }
 
+  public void registerDatanode(DatanodeRegistration nodeReg)
+      throws IOException {
+    assert namesystem.hasWriteLock();
+    datanodeManager.registerDatanode(nodeReg);
+    bmSafeMode.checkSafeMode();
+  }
+
+  /**
+   * Set the total number of blocks in the system.
+   * If safe mode is not currently on, this is a no-op.
+   */
+  public void setBlockTotal(long total) {
+    if (bmSafeMode.isInSafeMode()) {
+      bmSafeMode.setBlockTotal(total);
+      bmSafeMode.checkSafeMode();
+    }
+  }
+
+  public boolean isInSafeMode() {
+    return bmSafeMode.isInSafeMode();
+  }
+
+  public String getSafeModeTip() {
+    return bmSafeMode.getSafeModeTip();
+  }
+
+  public void leaveSafeMode(boolean force) {
+    bmSafeMode.leaveSafeMode(force);
+  }
+
+  void checkSafeMode() {
+    bmSafeMode.checkSafeMode();
+  }
+
+  /**
+   * Removes the blocks from blocksmap and updates the safemode blocks total.
+   * @param blocks An instance of {@link BlocksMapUpdateInfo} which contains a
+   *               list of blocks that need to be removed from blocksMap
+   */
+  public void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
+    assert namesystem.hasWriteLock();
+    // In the case that we are a Standby tailing edits from the
+    // active while in safe-mode, we need to track the total number
+    // of blocks and safe blocks in the system.
+    boolean trackBlockCounts = bmSafeMode.isSafeModeTrackingBlocks();
+    int numRemovedComplete = 0, numRemovedSafe = 0;
+
+    for (BlockInfo b : blocks.getToDeleteList()) {
+      if (trackBlockCounts) {
+        if (b.isComplete()) {
+          numRemovedComplete++;
+          if (hasMinStorage(b, b.numNodes())) {
+            numRemovedSafe++;
+          }
+        }
+      }
+      removeBlock(b);
+    }
+    if (trackBlockCounts) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adjusting safe-mode totals for deletion."
+            + "decreasing safeBlocks by " + numRemovedSafe
+            + ", totalBlocks by " + numRemovedComplete);
+      }
+      bmSafeMode.adjustBlockTotals(-numRemovedSafe, -numRemovedComplete);
+    }
+  }
+
   /**
    * StatefulBlockInfo is used to build the "toUC" list, which is a list of
    * updates to the information about under-construction blocks.
@@ -2333,7 +2408,7 @@ public class BlockManager implements BlockStatsMXBean {
         if (namesystem.isInSnapshot(storedBlock)) {
           int numOfReplicas = storedBlock.getUnderConstructionFeature()
               .getNumExpectedLocations();
-          namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock);
+          bmSafeMode.incrementSafeBlockCount(numOfReplicas, storedBlock);
         }
         //and fall through to next clause
       }      
@@ -2732,7 +2807,7 @@ public class BlockManager implements BlockStatsMXBean {
       // only complete blocks are counted towards that.
       // In the case that the block just became complete above, completeBlock()
       // handles the safe block count maintenance.
-      namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock);
+      bmSafeMode.incrementSafeBlockCount(numCurrentReplica, storedBlock);
     }
   }
 
@@ -2808,7 +2883,7 @@ public class BlockManager implements BlockStatsMXBean {
       // Is no-op if not in safe mode.
       // In the case that the block just became complete above, completeBlock()
       // handles the safe block count maintenance.
-      namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock);
+      bmSafeMode.incrementSafeBlockCount(numCurrentReplica, storedBlock);
     }
     
     // if file is under construction, then done for now
@@ -3352,7 +3427,7 @@ public class BlockManager implements BlockStatsMXBean {
       //
       BlockCollection bc = getBlockCollection(storedBlock);
       if (bc != null) {
-        namesystem.decrementSafeBlockCount(storedBlock);
+        bmSafeMode.decrementSafeBlockCount(storedBlock);
         updateNeededReplications(storedBlock, -1, 0);
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerSafeMode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerSafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerSafeMode.java
new file mode 100644
index 0000000..297532e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerSafeMode.java
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.Daemon;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+/**
+ * Block manager safe mode info.
+ *
+ * During name node startup, counts the number of <em>safe blocks</em>, those
+ * that have at least the minimal number of replicas, and calculates the ratio
+ * of safe blocks to the total number of blocks in the system, which is the size
+ * of blocks. When the ratio reaches the {@link #threshold} and enough live data
+ * nodes have registered, it needs to wait for the safe mode {@link #extension}
+ * interval. After the extension period has passed, it will not leave safe mode
+ * until the safe blocks ratio reaches the {@link #threshold} and enough live
+ * data node registered.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class BlockManagerSafeMode {
+  enum BMSafeModeStatus {
+    PENDING_THRESHOLD, /** Pending on more safe blocks or live datanode. */
+    EXTENSION,         /** In extension period. */
+    OFF                /** Safe mode is off. */
+  }
+
+  static final Logger LOG = LoggerFactory.getLogger(BlockManagerSafeMode.class);
+  static final Step STEP_AWAITING_REPORTED_BLOCKS =
+      new Step(StepType.AWAITING_REPORTED_BLOCKS);
+
+  private final BlockManager blockManager;
+  private final Namesystem namesystem;
+  private final boolean haEnabled;
+  private volatile BMSafeModeStatus status = BMSafeModeStatus.OFF;
+
+  /** Safe mode threshold condition %.*/
+  private final double threshold;
+  /** Number of blocks needed to satisfy safe mode threshold condition. */
+  private long blockThreshold;
+  /** Total number of blocks. */
+  private long blockTotal;
+  /** Number of safe blocks. */
+  private long blockSafe;
+  /** Safe mode minimum number of datanodes alive. */
+  private final int datanodeThreshold;
+  /** Min replication required by safe mode. */
+  private final int safeReplication;
+  /** Threshold for populating needed replication queues. */
+  private final double replQueueThreshold;
+  /** Number of blocks needed before populating replication queues. */
+  private long blockReplQueueThreshold;
+
+  /** How long (in ms) is the extension period. */
+  private final int extension;
+  /** Timestamp of the first time when thresholds are met. */
+  private final AtomicLong reachedTime = new AtomicLong();
+  /** Timestamp of the safe mode initialized. */
+  private long startTime;
+  /** the safe mode monitor thread. */
+  private final Daemon smmthread = new Daemon(new SafeModeMonitor());
+
+  /** time of the last status printout */
+  private long lastStatusReport;
+  /** Counter for tracking startup progress of reported blocks. */
+  private Counter awaitingReportedBlocksCounter;
+
+  BlockManagerSafeMode(BlockManager blockManager, Namesystem namesystem,
+      Configuration conf) {
+    this.blockManager = blockManager;
+    this.namesystem = namesystem;
+    this.haEnabled = namesystem.isHaEnabled();
+    this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
+        DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
+    if (this.threshold > 1.0) {
+      LOG.warn("The threshold value should't be greater than 1, threshold: {}",
+          threshold);
+    }
+    this.datanodeThreshold = conf.getInt(
+        DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
+        DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
+    int minReplication =
+        conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
+    // DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY is an expert level setting,
+    // setting this lower than the min replication is not recommended
+    // and/or dangerous for production setups.
+    // When it's unset, safeReplication will use dfs.namenode.replication.min
+    this.safeReplication =
+        conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY,
+            minReplication);
+    // default to safe mode threshold (i.e., don't populate queues before
+    // leaving safe mode)
+    this.replQueueThreshold =
+        conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
+            (float) threshold);
+
+    this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
+
+    LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, threshold);
+    LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
+        datanodeThreshold);
+    LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, extension);
+  }
+
+  /**
+   * Initialize the safe mode information.
+   * @param total initial total blocks
+   */
+  void activate(long total) {
+    assert namesystem.hasWriteLock();
+    assert status == BMSafeModeStatus.OFF;
+
+    startTime = monotonicNow();
+    setBlockTotal(total);
+    if (areThresholdsMet()) {
+      leaveSafeMode(true);
+    } else {
+      // enter safe mode
+      status = BMSafeModeStatus.PENDING_THRESHOLD;
+      initializeReplQueuesIfNecessary();
+      reportStatus("STATE* Safe mode ON.", true);
+      lastStatusReport = monotonicNow();
+    }
+  }
+
+  /**
+   * @return true if it stays in start up safe mode else false.
+   */
+  boolean isInSafeMode() {
+    if (status != BMSafeModeStatus.OFF) {
+      doConsistencyCheck();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * The transition of the safe mode state machine.
+   * If safe mode is not currently on, this is a no-op.
+   */
+  void checkSafeMode() {
+    assert namesystem.hasWriteLock();
+    if (namesystem.inTransitionToActive()) {
+      return;
+    }
+
+    switch (status) {
+    case PENDING_THRESHOLD:
+      if (areThresholdsMet()) {
+        if (extension > 0) {
+          // PENDING_THRESHOLD -> EXTENSION
+          status = BMSafeModeStatus.EXTENSION;
+          reachedTime.set(monotonicNow());
+          smmthread.start();
+          initializeReplQueuesIfNecessary();
+          reportStatus("STATE* Safe mode extension entered.", true);
+        } else {
+          // PENDING_THRESHOLD -> OFF
+          leaveSafeMode(false);
+        }
+      } else {
+        initializeReplQueuesIfNecessary();
+        reportStatus("STATE* Safe mode ON.", false);
+      }
+      break;
+    case EXTENSION:
+      reportStatus("STATE* Safe mode ON.", false);
+      break;
+    case OFF:
+      break;
+    default:
+      assert false : "Non-recognized block manager safe mode status: " + status;
+    }
+  }
+
+  /**
+   * Adjust the total number of blocks safe and expected during safe mode.
+   * If safe mode is not currently on, this is a no-op.
+   * @param deltaSafe  the change in number of safe blocks
+   * @param deltaTotal the change in number of total blocks expected
+   */
+  void adjustBlockTotals(int deltaSafe, int deltaTotal) {
+    assert namesystem.hasWriteLock();
+    if (!isSafeModeTrackingBlocks()) {
+      return;
+    }
+
+    long newBlockTotal;
+    synchronized (this) {
+      LOG.debug("Adjusting block totals from {}/{} to {}/{}",  blockSafe,
+          blockTotal, blockSafe + deltaSafe, blockTotal + deltaTotal);
+      assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " +
+          blockSafe + " by " + deltaSafe + ": would be negative";
+      assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " +
+          blockTotal + " by " + deltaTotal + ": would be negative";
+
+      blockSafe += deltaSafe;
+      newBlockTotal = blockTotal + deltaTotal;
+    }
+    setBlockTotal(newBlockTotal);
+    checkSafeMode();
+  }
+
+  /**
+   * Should we track blocks in safe mode.
+   * <p/>
+   * Never track blocks incrementally in non-HA code.
+   * <p/>
+   * In the HA case, the StandbyNode can be in safemode while the namespace
+   * is modified by the edit log tailer. In this case, the number of total
+   * blocks changes as edits are processed (eg blocks are added and deleted).
+   * However, we don't want to do the incremental tracking during the
+   * startup-time loading process -- only once the initial total has been
+   * set after the image has been loaded.
+   */
+  boolean isSafeModeTrackingBlocks() {
+    assert namesystem.hasWriteLock();
+    return haEnabled && status != BMSafeModeStatus.OFF;
+  }
+
+  /**
+   * Set total number of blocks.
+   */
+  void setBlockTotal(long total) {
+    assert namesystem.hasWriteLock();
+    synchronized (this) {
+      this.blockTotal = total;
+      this.blockThreshold = (long) (total * threshold);
+    }
+    this.blockReplQueueThreshold = (long) (total * replQueueThreshold);
+  }
+
+  String getSafeModeTip() {
+    String msg = "";
+
+    synchronized (this) {
+      if (blockSafe < blockThreshold) {
+        msg += String.format(
+            "The reported blocks %d needs additional %d"
+                + " blocks to reach the threshold %.4f of total blocks %d.%n",
+            blockSafe, (blockThreshold - blockSafe), threshold, blockTotal);
+      } else {
+        msg += String.format("The reported blocks %d has reached the threshold"
+            + " %.4f of total blocks %d. ", blockSafe, threshold, blockTotal);
+      }
+    }
+
+    int numLive = blockManager.getDatanodeManager().getNumLiveDataNodes();
+    if (numLive < datanodeThreshold) {
+      msg += String.format(
+          "The number of live datanodes %d needs an additional %d live "
+              + "datanodes to reach the minimum number %d.%n",
+          numLive, (datanodeThreshold - numLive), datanodeThreshold);
+    } else {
+      msg += String.format("The number of live datanodes %d has reached "
+              + "the minimum number %d. ",
+          numLive, datanodeThreshold);
+    }
+
+    if (blockManager.getBytesInFuture() > 0) {
+      msg += "Name node detected blocks with generation stamps " +
+          "in future. This means that Name node metadata is inconsistent." +
+          "This can happen if Name node metadata files have been manually " +
+          "replaced. Exiting safe mode will cause loss of " + blockManager
+          .getBytesInFuture() + " byte(s). Please restart name node with " +
+          "right metadata or use \"hdfs dfsadmin -safemode forceExit" +
+          "if you are certain that the NameNode was started with the" +
+          "correct FsImage and edit logs. If you encountered this during" +
+          "a rollback, it is safe to exit with -safemode forceExit.";
+      return msg;
+    }
+
+    final String turnOffTip = "Safe mode will be turned off automatically ";
+    switch(status) {
+    case PENDING_THRESHOLD:
+      msg += turnOffTip + "once the thresholds have been reached.";
+      break;
+    case EXTENSION:
+      msg += "In safe mode extension. "+ turnOffTip + "in " +
+          timeToLeaveExtension() / 1000 + " seconds.";
+      break;
+    case OFF:
+      msg += turnOffTip + "soon.";
+      break;
+    default:
+      assert false : "Non-recognized block manager safe mode status: " + status;
+    }
+    return msg;
+  }
+
+  /**
+   * Leave start up safe mode.
+   * @param force - true to force exit
+   */
+  void leaveSafeMode(boolean force) {
+    assert namesystem.hasWriteLock() : "Leaving safe mode needs write lock!";
+
+    // if not done yet, initialize replication queues.
+    // In the standby, do not populate repl queues
+    if (!blockManager.isPopulatingReplQueues() &&
+        blockManager.shouldPopulateReplQueues()) {
+      blockManager.initializeReplQueues();
+    }
+
+    if (!force && blockManager.getBytesInFuture() > 0) {
+      LOG.error("Refusing to leave safe mode without a force flag. " +
+          "Exiting safe mode will cause a deletion of {} byte(s). Please use " +
+          "-forceExit flag to exit safe mode forcefully if data loss is " +
+          "acceptable.", blockManager.getBytesInFuture());
+      return;
+    }
+
+    if (status != BMSafeModeStatus.OFF) {
+      NameNode.stateChangeLog.info("STATE* Safe mode is OFF");
+    }
+    status = BMSafeModeStatus.OFF;
+
+    final long timeInSafemode = monotonicNow() - startTime;
+    NameNode.stateChangeLog.info("STATE* Leaving safe mode after {} secs",
+        timeInSafemode / 1000);
+    NameNode.getNameNodeMetrics().setSafeModeTime(timeInSafemode);
+
+    final NetworkTopology nt = blockManager.getDatanodeManager()
+        .getNetworkTopology();
+    NameNode.stateChangeLog.info("STATE* Network topology has {} racks and {}" +
+        " datanodes", nt.getNumOfRacks(), nt.getNumOfLeaves());
+    NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has {} blocks",
+        blockManager.numOfUnderReplicatedBlocks());
+
+    namesystem.startSecretManagerIfNecessary();
+
+    // If startup has not yet completed, end safemode phase.
+    StartupProgress prog = NameNode.getStartupProgress();
+    if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
+      prog.endStep(Phase.SAFEMODE,
+          BlockManagerSafeMode.STEP_AWAITING_REPORTED_BLOCKS);
+      prog.endPhase(Phase.SAFEMODE);
+    }
+  }
+
+  /**
+   * Increment number of safe blocks if current block has reached minimal
+   * replication.
+   * If safe mode is not currently on, this is a no-op.
+   * @param storageNum  current number of replicas or number of internal blocks
+   *                    of a striped block group
+   * @param storedBlock current storedBlock which is either a
+   *                    BlockInfoContiguous or a BlockInfoStriped
+   */
+  synchronized void incrementSafeBlockCount(int storageNum,
+      BlockInfo storedBlock) {
+    assert namesystem.hasWriteLock();
+    if (status == BMSafeModeStatus.OFF) {
+      return;
+    }
+
+    final int safe = storedBlock.isStriped() ?
+        ((BlockInfoStriped)storedBlock).getRealDataBlockNum() : safeReplication;
+    if (storageNum == safe) {
+      this.blockSafe++;
+
+      // Report startup progress only if we haven't completed startup yet.
+      StartupProgress prog = NameNode.getStartupProgress();
+      if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
+        if (this.awaitingReportedBlocksCounter == null) {
+          this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE,
+              STEP_AWAITING_REPORTED_BLOCKS);
+        }
+        this.awaitingReportedBlocksCounter.increment();
+      }
+
+      checkSafeMode();
+    }
+  }
+
+  /**
+   * Decrement number of safe blocks if current block has fallen below minimal
+   * replication.
+   * If safe mode is not currently on, this is a no-op.
+   */
+  synchronized void decrementSafeBlockCount(BlockInfo b) {
+    assert namesystem.hasWriteLock();
+    if (status == BMSafeModeStatus.OFF) {
+      return;
+    }
+
+    BlockInfo storedBlock = blockManager.getStoredBlock(b);
+    if (storedBlock.isComplete() &&
+        blockManager.countNodes(b).liveReplicas() == safeReplication - 1) {
+      this.blockSafe--;
+      assert blockSafe >= 0;
+      checkSafeMode();
+    }
+  }
+
+  void close() {
+    assert namesystem.hasWriteLock() : "Closing bmSafeMode needs write lock!";
+    try {
+      smmthread.interrupt();
+      smmthread.join(3000);
+    } catch (InterruptedException ignored) {
+    }
+  }
+
+  /**
+   * Get time (counting in milliseconds) left to leave extension period.
+   *
+   * Negative value indicates the extension period has passed.
+   */
+  private long timeToLeaveExtension() {
+    return reachedTime.get() + extension - monotonicNow();
+  }
+
+  /** Check if we are ready to initialize replication queues. */
+  private void initializeReplQueuesIfNecessary() {
+    assert namesystem.hasWriteLock();
+    // Whether it has reached the threshold for initializing replication queues.
+    boolean canInitializeReplQueues = blockManager.shouldPopulateReplQueues() &&
+        blockSafe >= blockReplQueueThreshold;
+    if (canInitializeReplQueues &&
+        !blockManager.isPopulatingReplQueues() &&
+        !haEnabled) {
+      blockManager.initializeReplQueues();
+    }
+  }
+
+  /**
+   * @return true if both block and datanode threshold are met else false.
+   */
+  private boolean areThresholdsMet() {
+    assert namesystem.hasWriteLock();
+    int datanodeNum = blockManager.getDatanodeManager().getNumLiveDataNodes();
+    synchronized (this) {
+      return blockSafe >= blockThreshold && datanodeNum >= datanodeThreshold;
+    }
+  }
+
+  /**
+   * Checks consistency of the class state.
+   * This is costly so only runs if asserts are enabled.
+   */
+  private void doConsistencyCheck() {
+    boolean assertsOn = false;
+    assert assertsOn = true; // set to true if asserts are on
+    if (!assertsOn) {
+      return;
+    }
+
+    int activeBlocks = blockManager.getActiveBlockCount();
+    synchronized (this) {
+      if (blockTotal != activeBlocks &&
+          !(blockSafe >= 0 && blockSafe <= blockTotal)) {
+        LOG.warn("SafeMode is in inconsistent filesystem state. " +
+            "BlockManagerSafeMode data: blockTotal={}, blockSafe={}; " +
+            "BlockManager data: activeBlocks={}",
+            blockTotal, blockSafe, activeBlocks);
+      }
+    }
+  }
+
+  /**
+   * Print status every 20 seconds.
+   */
+  private void reportStatus(String msg, boolean rightNow) {
+    assert namesystem.hasWriteLock();
+    long curTime = monotonicNow();
+    if(!rightNow && (curTime - lastStatusReport < 20 * 1000)) {
+      return;
+    }
+    NameNode.stateChangeLog.info(msg + " \n" + getSafeModeTip());
+    lastStatusReport = curTime;
+  }
+
+  /**
+   * Periodically check whether it is time to leave safe mode.
+   * This thread starts when the threshold level is reached.
+   */
+  private class SafeModeMonitor implements Runnable {
+    /** Interval in msec for checking safe mode. */
+    private static final long RECHECK_INTERVAL = 1000;
+
+    @Override
+    public void run() {
+      while (namesystem.isRunning()) {
+        try {
+          namesystem.writeLock();
+          if (status == BMSafeModeStatus.OFF) { // Not in safe mode.
+            break;
+          }
+          if (canLeave()) {
+            // EXTENSION -> OFF
+            leaveSafeMode(false);
+            break;
+          }
+        } finally {
+          namesystem.writeUnlock();
+        }
+
+        try {
+          Thread.sleep(RECHECK_INTERVAL);
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      if (!namesystem.isRunning()) {
+        LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread");
+      }
+    }
+
+    /**
+     * Check whether the safe mode can be turned off by this monitor.
+     *
+     * Safe mode can be turned off iff
+     * the threshold is reached, and
+     * the extension time has passed.
+     */
+    private boolean canLeave() {
+      if (timeToLeaveExtension() > 0) {
+        reportStatus("STATE* Safe mode ON, in safe mode extension.", false);
+        return false;
+      } else if (!areThresholdsMet()) {
+        reportStatus("STATE* Safe mode ON, thresholds not met.", false);
+        return false;
+      } else {
+        return true;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index d35b237..f758454 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -88,7 +88,7 @@ public class DatanodeManager {
   private final Map<String, DatanodeDescriptor> datanodeMap
       = new HashMap<>();
 
-  /** Cluster network topology */
+  /** Cluster network topology. */
   private final NetworkTopology networktopology;
 
   /** Host names to datanode descriptors mapping. */
@@ -105,7 +105,7 @@ public class DatanodeManager {
 
   private final int defaultIpcPort;
 
-  /** Read include/exclude files*/
+  /** Read include/exclude files. */
   private final HostFileManager hostFileManager = new HostFileManager();
 
   /** The period to wait for datanode heartbeat.*/
@@ -560,7 +560,7 @@ public class DatanodeManager {
     if (LOG.isDebugEnabled()) {
       LOG.debug("remove datanode " + nodeInfo);
     }
-    namesystem.checkSafeMode();
+    blockManager.checkSafeMode();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
index 83d835ac..a782049 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
@@ -256,7 +256,8 @@ class Checkpointer extends Daemon {
       if(backupNode.namesystem.getBlocksTotal() > 0) {
         long completeBlocksTotal =
             backupNode.namesystem.getCompleteBlocksTotal();
-        backupNode.namesystem.setBlockTotal(completeBlocksTotal);
+        backupNode.namesystem.getBlockManager().setBlockTotal(
+            completeBlocksTotal);
       }
       bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
       if (!backupNode.namesystem.isRollingUpgrade()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
index 006fbc2..6db2ce8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
@@ -150,7 +150,7 @@ class FSDirDeleteOp {
 
     if (filesRemoved) {
       fsn.removeLeasesAndINodes(removedUCFiles, removedINodes, false);
-      fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+      fsn.getBlockManager().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index 210a060..c64dfea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -312,7 +312,8 @@ class FSDirRenameOp {
     unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, timestamp,
         collectedBlocks, options);
     if (!collectedBlocks.getToDeleteList().isEmpty()) {
-      fsd.getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+      fsd.getFSNamesystem().getBlockManager()
+          .removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
index b46a195..03eb96d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
@@ -191,7 +191,7 @@ final class FSDirTruncateOp {
     }
     assert onBlockBoundary == (truncateBlock == null) :
       "truncateBlock is null iff on block boundary: " + truncateBlock;
-    fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+    fsn.getBlockManager().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 6819d8d..23683d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -747,7 +747,8 @@ public class FSEditLogLoader {
           deleteSnapshotOp.snapshotName,
           new INode.ReclaimContext(fsNamesys.dir.getBlockStoragePolicySuite(),
               collectedBlocks, removedINodes, null));
-      fsNamesys.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+      fsNamesys.getBlockManager().removeBlocksAndUpdateSafemodeTotal(
+          collectedBlocks);
       collectedBlocks.clear();
       fsNamesys.dir.removeFromInodeMap(removedINodes);
       removedINodes.clear();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 89df008..6af7265 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -69,18 +69,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FIL
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
@@ -236,8 +230,6 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
 import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
@@ -267,7 +259,6 @@ import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -440,8 +431,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   final LeaseManager leaseManager = new LeaseManager(this); 
 
-  volatile Daemon smmthread = null;  // SafeModeMonitor thread
-  
   Daemon nnrmthread = null; // NamenodeResourceMonitor thread
 
   Daemon nnEditLogRoller = null; // NameNodeEditLogRoller thread
@@ -479,8 +468,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   private final FsServerDefaults serverDefaults;
   private final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
 
-  private volatile SafeModeInfo safeMode;  // safe mode information
-
   private final long maxFsObjects;          // maximum number of fs objects
 
   private final long minBlockSize;         // minimum block size
@@ -536,6 +523,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   private INodeAttributeProvider inodeAttributeProvider;
 
   /**
+   * If the NN is in safemode, and not due to manual / low resources, we
+   * assume it must be because of startup. If the NN had low resources during
+   * startup, we assume it came out of startup safemode and it is now in low
+   * resources safemode.
+   */
+  private boolean manualSafeMode = false;
+  private boolean resourceLowSafeMode = false;
+
+  /**
    * Notify that loading of this FSDirectory is complete, and
    * it is imageLoaded for use
    */
@@ -606,7 +602,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return leaseManager;
   }
   
-  boolean isHaEnabled() {
+  @Override
+  public boolean isHaEnabled() {
     return haEnabled;
   }
 
@@ -742,10 +739,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
           DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
 
-      this.blockManager = new BlockManager(this, conf);
-      this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
-      this.blockIdManager = new BlockIdManager(blockManager);
-
       this.fsOwner = UserGroupInformation.getCurrentUser();
       this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, 
                                  DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
@@ -771,8 +764,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             "must not be specified if HA is not enabled.");
       }
 
+      // block manager needs the haEnabled initialized
+      this.blockManager = new BlockManager(this, conf);
+      this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
+      this.blockIdManager = new BlockIdManager(blockManager);
+
       // Get the checksum type from config
-      String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT);
+      String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY,
+          DFS_CHECKSUM_TYPE_DEFAULT);
       DataChecksum.Type checksumType;
       try {
          checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
@@ -836,7 +835,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       this.snapshotManager = new SnapshotManager(dir);
       this.cacheManager = new CacheManager(this, conf, blockManager);
       this.ecPolicyManager = new ErasureCodingPolicyManager();
-      this.safeMode = new SafeModeInfo(conf);
       this.topConf = new TopConf(conf);
       this.auditLoggers = initAuditLoggers(conf);
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
@@ -1023,7 +1021,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
   
-  private void startSecretManagerIfNecessary() {
+  @Override
+  public void startSecretManagerIfNecessary() {
     boolean shouldRun = shouldUseDelegationTokens() &&
       !isInSafeMode() && getEditLog().isOpenForWrite();
     boolean running = dtSecretManager.isRunning();
@@ -1048,14 +1047,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
       nnResourceChecker = new NameNodeResourceChecker(conf);
       checkAvailableResources();
-      assert safeMode != null && !blockManager.isPopulatingReplQueues();
+      assert !blockManager.isPopulatingReplQueues();
       StartupProgress prog = NameNode.getStartupProgress();
       prog.beginPhase(Phase.SAFEMODE);
       long completeBlocksTotal = getCompleteBlocksTotal();
       prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
           completeBlocksTotal);
-      setBlockTotal(completeBlocksTotal);
-      blockManager.activate(conf);
+      blockManager.activate(conf, completeBlocksTotal);
     } finally {
       writeUnlock();
     }
@@ -1123,7 +1121,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
               "replication and invalidation queues during failover:\n" +
               metaSaveAsString());
         }
-        
+
         long nextTxId = getFSImage().getLastAppliedTxId() + 1;
         LOG.info("Will take over writing edit logs at txnid " + 
             nextTxId);
@@ -1167,7 +1165,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
     } finally {
       startingActiveService = false;
-      checkSafeMode();
       writeUnlock();
     }
   }
@@ -1177,10 +1174,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         haContext.getState().getServiceState() == HAServiceState.ACTIVE;
   }
 
-  /**
-   * @return Whether the namenode is transitioning to active state and is in the
-   *         middle of the {@link #startActiveServices()}
-   */
+  @Override
   public boolean inTransitionToActive() {
     return haEnabled && inActiveState() && startingActiveService;
   }
@@ -1317,7 +1311,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       SafeModeException se = newSafemodeException(errorMsg);
       if (haEnabled && haContext != null
           && haContext.getState().getServiceState() == HAServiceState.ACTIVE
-          && shouldRetrySafeMode(this.safeMode)) {
+          && isInStartupSafeMode()) {
         throw new RetriableException(se);
       } else {
         throw se;
@@ -1327,25 +1321,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   private SafeModeException newSafemodeException(String errorMsg) {
     return new SafeModeException(errorMsg + ". Name node is in safe " +
-        "mode.\n" + safeMode.getTurnOffTip());
+        "mode.\n" + getSafeModeTip());
   }
 
   boolean isPermissionEnabled() {
     return isPermissionEnabled;
   }
 
-  /**
-   * We already know that the safemode is on. We will throw a RetriableException
-   * if the safemode is not manual or caused by low resource.
-   */
-  private boolean shouldRetrySafeMode(SafeModeInfo safeMode) {
-    if (safeMode == null) {
-      return false;
-    } else {
-      return !safeMode.isManual() && !safeMode.areResourcesLow();
-    }
-  }
-  
   public static Collection<URI> getNamespaceDirs(Configuration conf) {
     return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
   }
@@ -1579,7 +1561,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     fsRunning = false;
     try {
       stopCommonServices();
-      if (smmthread != null) smmthread.interrupt();
     } finally {
       // using finally to ensure we also wait for lease daemon
       try {
@@ -2813,54 +2794,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   /**
-   * Removes the blocks from blocksmap and updates the safemode blocks total
-   * 
-   * @param blocks
-   *          An instance of {@link BlocksMapUpdateInfo} which contains a list
-   *          of blocks that need to be removed from blocksMap
-   */
-  void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
-    assert hasWriteLock();
-    // In the case that we are a Standby tailing edits from the
-    // active while in safe-mode, we need to track the total number
-    // of blocks and safe blocks in the system.
-    boolean trackBlockCounts = isSafeModeTrackingBlocks();
-    int numRemovedComplete = 0, numRemovedSafe = 0;
-
-    for (BlockInfo b : blocks.getToDeleteList()) {
-      if (trackBlockCounts) {
-        if (b.isComplete()) {
-          numRemovedComplete++;
-          if (blockManager.hasMinStorage(b, b.numNodes())) {
-            numRemovedSafe++;
-          }
-        }
-      }
-      blockManager.removeBlock(b);
-    }
-    if (trackBlockCounts) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adjusting safe-mode totals for deletion."
-            + "decreasing safeBlocks by " + numRemovedSafe
-            + ", totalBlocks by " + numRemovedComplete);
-      }
-      adjustSafeModeBlockTotals(-numRemovedSafe, -numRemovedComplete);
-    }
-  }
-
-  /**
-   * @see SafeModeInfo#shouldIncrementallyTrackBlocks
-   */
-  private boolean isSafeModeTrackingBlocks() {
-    if (!haEnabled) {
-      // Never track blocks incrementally in non-HA code.
-      return false;
-    }
-    SafeModeInfo sm = this.safeMode;
-    return sm != null && sm.shouldIncrementallyTrackBlocks();
-  }
-
-  /**
    * Get the file info for a specific file.
    *
    * @param src The string representation of the path to the file
@@ -3587,8 +3520,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
     writeLock();
     try {
-      getBlockManager().getDatanodeManager().registerDatanode(nodeReg);
-      checkSafeMode();
+      blockManager.registerDatanode(nodeReg);
     } finally {
       writeUnlock();
     }
@@ -4142,575 +4074,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
   }
 
-  /**
-   * SafeModeInfo contains information related to the safe mode.
-   * <p>
-   * An instance of {@link SafeModeInfo} is created when the name node
-   * enters safe mode.
-   * <p>
-   * During name node startup {@link SafeModeInfo} counts the number of
-   * <em>safe blocks</em>, those that have at least the minimal number of
-   * replicas, and calculates the ratio of safe blocks to the total number
-   * of blocks in the system, which is the size of blocks in
-   * {@link FSNamesystem#blockManager}. When the ratio reaches the
-   * {@link #threshold} it starts the SafeModeMonitor daemon in order
-   * to monitor whether the safe mode {@link #extension} is passed.
-   * Then it leaves safe mode and destroys itself.
-   * <p>
-   * If safe mode is turned on manually then the number of safe blocks is
-   * not tracked because the name node is not intended to leave safe mode
-   * automatically in the case.
-   *
-   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
-   */
-  public class SafeModeInfo {
-    // configuration fields
-    /** Safe mode threshold condition %.*/
-    private final double threshold;
-    /** Safe mode minimum number of datanodes alive */
-    private final int datanodeThreshold;
-    /**
-     * Safe mode extension after the threshold.
-     * Make it volatile so that getSafeModeTip can read the latest value
-     * without taking a lock.
-     */
-    private volatile int extension;
-    /** Min replication required by safe mode. */
-    private final int safeReplication;
-    /** threshold for populating needed replication queues */
-    private final double replQueueThreshold;
-    // internal fields
-    /** Time when threshold was reached.
-     * <br> -1 safe mode is off
-     * <br> 0 safe mode is on, and threshold is not reached yet
-     * <br> >0 safe mode is on, but we are in extension period 
-     */
-    private long reached = -1;  
-    private long reachedTimestamp = -1;
-    /** Total number of blocks. */
-    int blockTotal; 
-    /** Number of safe blocks. */
-    int blockSafe;
-    /** Number of blocks needed to satisfy safe mode threshold condition */
-    private int blockThreshold;
-    /** Number of blocks needed before populating replication queues */
-    private int blockReplQueueThreshold;
-    /** time of the last status printout */
-    private long lastStatusReport = 0;
-    /**
-     * Was safemode entered automatically because available resources were low.
-     * Make it volatile so that getSafeModeTip can read the latest value
-     * without taking a lock.
-     */
-    private volatile boolean resourcesLow = false;
-    /** Should safemode adjust its block totals as blocks come in */
-    private boolean shouldIncrementallyTrackBlocks = false;
-    /** counter for tracking startup progress of reported blocks */
-    private Counter awaitingReportedBlocksCounter;
-    
-    /**
-     * Creates SafeModeInfo when the name node enters
-     * automatic safe mode at startup.
-     *  
-     * @param conf configuration
-     */
-    private SafeModeInfo(Configuration conf) {
-      this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
-          DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
-      if(threshold > 1.0) {
-        LOG.warn("The threshold value should't be greater than 1, threshold: " + threshold);
-      }
-      this.datanodeThreshold = conf.getInt(
-        DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
-        DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
-      this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
-      int minReplication =
-          conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
-              DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
-      // DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY is an expert level setting,
-      // setting this lower than the min replication is not recommended
-      // and/or dangerous for production setups.
-      // When it's unset, safeReplication will use dfs.namenode.replication.min
-      this.safeReplication =
-          conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY,
-              minReplication);
-
-      LOG.info(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY + " = " + threshold);
-      LOG.info(DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY + " = " + datanodeThreshold);
-      LOG.info(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + "     = " + extension);
-
-      // default to safe mode threshold (i.e., don't populate queues before leaving safe mode)
-      this.replQueueThreshold = 
-        conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
-                      (float) threshold);
-      this.blockTotal = 0; 
-      this.blockSafe = 0;
-    }
-
-    /**
-     * In the HA case, the StandbyNode can be in safemode while the namespace
-     * is modified by the edit log tailer. In this case, the number of total
-     * blocks changes as edits are processed (eg blocks are added and deleted).
-     * However, we don't want to do the incremental tracking during the
-     * startup-time loading process -- only once the initial total has been
-     * set after the image has been loaded.
-     */
-    private boolean shouldIncrementallyTrackBlocks() {
-      return shouldIncrementallyTrackBlocks;
-    }
-
-    /**
-     * Creates SafeModeInfo when safe mode is entered manually, or because
-     * available resources are low.
-     *
-     * The {@link #threshold} is set to 1.5 so that it could never be reached.
-     * {@link #blockTotal} is set to -1 to indicate that safe mode is manual.
-     * 
-     * @see SafeModeInfo
-     */
-    private SafeModeInfo(boolean resourcesLow) {
-      this.threshold = 1.5f;  // this threshold can never be reached
-      this.datanodeThreshold = Integer.MAX_VALUE;
-      this.extension = Integer.MAX_VALUE;
-      this.safeReplication = Short.MAX_VALUE + 1; // more than maxReplication
-      this.replQueueThreshold = 1.5f; // can never be reached
-      this.blockTotal = -1;
-      this.blockSafe = -1;
-      this.resourcesLow = resourcesLow;
-      enter();
-      reportStatus("STATE* Safe mode is ON.", true);
-    }
-      
-    /**
-     * Check if safe mode is on.
-     * @return true if in safe mode
-     */
-    private synchronized boolean isOn() {
-      doConsistencyCheck();
-      return this.reached >= 0;
-    }
-      
-    /**
-     * Enter safe mode.
-     */
-    private void enter() {
-      this.reached = 0;
-      this.reachedTimestamp = 0;
-    }
-      
-    /**
-     * Leave safe mode.
-     * <p>
-     * Check for invalid, under- & over-replicated blocks in the end of startup.
-     * @param force - true to force exit
-     */
-    private synchronized void leave(boolean force) {
-      // if not done yet, initialize replication queues.
-      // In the standby, do not populate repl queues
-      if (!blockManager.isPopulatingReplQueues() && blockManager.shouldPopulateReplQueues()) {
-        blockManager.initializeReplQueues();
-      }
-
-
-      if (!force && (blockManager.getBytesInFuture() > 0)) {
-        LOG.error("Refusing to leave safe mode without a force flag. " +
-            "Exiting safe mode will cause a deletion of " + blockManager
-            .getBytesInFuture() + " byte(s). Please use " +
-            "-forceExit flag to exit safe mode forcefully if data loss is " +
-            "acceptable.");
-        return;
-      }
-
-      long timeInSafemode = now() - startTime;
-      NameNode.stateChangeLog.info("STATE* Leaving safe mode after " 
-                                    + timeInSafemode/1000 + " secs");
-      NameNode.getNameNodeMetrics().setSafeModeTime((int) timeInSafemode);
-
-      //Log the following only once (when transitioning from ON -> OFF)
-      if (reached >= 0) {
-        NameNode.stateChangeLog.info("STATE* Safe mode is OFF"); 
-      }
-      reached = -1;
-      reachedTimestamp = -1;
-      safeMode = null;
-      final NetworkTopology nt = blockManager.getDatanodeManager().getNetworkTopology();
-      NameNode.stateChangeLog.info("STATE* Network topology has "
-          + nt.getNumOfRacks() + " racks and "
-          + nt.getNumOfLeaves() + " datanodes");
-      NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
-          + blockManager.numOfUnderReplicatedBlocks() + " blocks");
-
-      startSecretManagerIfNecessary();
-
-      // If startup has not yet completed, end safemode phase.
-      StartupProgress prog = NameNode.getStartupProgress();
-      if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
-        prog.endStep(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS);
-        prog.endPhase(Phase.SAFEMODE);
-      }
-    }
-
-    /**
-     * Check whether we have reached the threshold for 
-     * initializing replication queues.
-     */
-    private synchronized boolean canInitializeReplQueues() {
-      return blockManager.shouldPopulateReplQueues()
-          && blockSafe >= blockReplQueueThreshold;
-    }
-      
-    /** 
-     * Safe mode can be turned off iff 
-     * the threshold is reached and 
-     * the extension time have passed.
-     * @return true if can leave or false otherwise.
-     */
-    private synchronized boolean canLeave() {
-      if (reached == 0) {
-        return false;
-      }
-
-      if (monotonicNow() - reached < extension) {
-        reportStatus("STATE* Safe mode ON, in safe mode extension.", false);
-        return false;
-      }
-
-      if (needEnter()) {
-        reportStatus("STATE* Safe mode ON, thresholds not met.", false);
-        return false;
-      }
-
-      return true;
-    }
-      
-    /** 
-     * There is no need to enter safe mode 
-     * if DFS is empty or {@link #threshold} == 0
-     */
-    private boolean needEnter() {
-      return (threshold != 0 && blockSafe < blockThreshold) ||
-        (datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
-        (!nameNodeHasResourcesAvailable());
-    }
-      
-    /**
-     * Check and trigger safe mode if needed. 
-     */
-    private void checkMode() {
-      // Have to have write-lock since leaving safemode initializes
-      // repl queues, which requires write lock
-      assert hasWriteLock();
-      if (inTransitionToActive()) {
-        return;
-      }
-      // if smmthread is already running, the block threshold must have been 
-      // reached before, there is no need to enter the safe mode again
-      if (smmthread == null && needEnter()) {
-        enter();
-        // check if we are ready to initialize replication queues
-        if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues()
-            && !haEnabled) {
-          blockManager.initializeReplQueues();
-        }
-        reportStatus("STATE* Safe mode ON.", false);
-        return;
-      }
-      // the threshold is reached or was reached before
-      if (!isOn() ||                           // safe mode is off
-          extension <= 0 || threshold <= 0) {  // don't need to wait
-        this.leave(false); // leave safe mode
-        return;
-      }
-      if (reached > 0) {  // threshold has already been reached before
-        reportStatus("STATE* Safe mode ON.", false);
-        return;
-      }
-      // start monitor
-      reached = monotonicNow();
-      reachedTimestamp = now();
-      if (smmthread == null) {
-        smmthread = new Daemon(new SafeModeMonitor());
-        smmthread.start();
-        reportStatus("STATE* Safe mode extension entered.", true);
-      }
-
-      // check if we are ready to initialize replication queues
-      if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues() && !haEnabled) {
-        blockManager.initializeReplQueues();
-      }
-    }
-      
-    /**
-     * Set total number of blocks.
-     */
-    private synchronized void setBlockTotal(int total) {
-      this.blockTotal = total;
-      this.blockThreshold = (int) (blockTotal * threshold);
-      this.blockReplQueueThreshold = 
-        (int) (blockTotal * replQueueThreshold);
-      if (haEnabled) {
-        // After we initialize the block count, any further namespace
-        // modifications done while in safe mode need to keep track
-        // of the number of total blocks in the system.
-        this.shouldIncrementallyTrackBlocks = true;
-      }
-      if(blockSafe < 0)
-        this.blockSafe = 0;
-      checkMode();
-    }
-      
-    /**
-     * Increment number of safe blocks if current block has 
-     * reached minimal replication.
-     * @param storageNum current number of replicas or number of internal blocks
-     *                   of a striped block group
-     * @param storedBlock current storedBlock which is either a
-     *                    BlockInfoContiguous or a BlockInfoStriped
-     */
-    private synchronized void incrementSafeBlockCount(short storageNum,
-        BlockInfo storedBlock) {
-      final int safe = storedBlock.isStriped() ?
-          ((BlockInfoStriped) storedBlock).getRealDataBlockNum() : safeReplication;
-      if (storageNum == safe) {
-        this.blockSafe++;
-
-        // Report startup progress only if we haven't completed startup yet.
-        StartupProgress prog = NameNode.getStartupProgress();
-        if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
-          if (this.awaitingReportedBlocksCounter == null) {
-            this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE,
-              STEP_AWAITING_REPORTED_BLOCKS);
-          }
-          this.awaitingReportedBlocksCounter.increment();
-        }
-
-        checkMode();
-      }
-    }
-      
-    /**
-     * Decrement number of safe blocks if current block has 
-     * fallen below minimal replication.
-     * @param replication current replication 
-     */
-    private synchronized void decrementSafeBlockCount(short replication) {
-      if (replication == safeReplication-1) {
-        this.blockSafe--;
-        //blockSafe is set to -1 in manual / low resources safemode
-        assert blockSafe >= 0 || isManual() || areResourcesLow();
-        checkMode();
-      }
-    }
-
-    /**
-     * Check if safe mode was entered manually
-     */
-    private boolean isManual() {
-      return extension == Integer.MAX_VALUE;
-    }
-
-    /**
-     * Set manual safe mode.
-     */
-    private synchronized void setManual() {
-      extension = Integer.MAX_VALUE;
-    }
-
-    /**
-     * Check if safe mode was entered due to resources being low.
-     */
-    private boolean areResourcesLow() {
-      return resourcesLow;
-    }
-
-    /**
-     * Set that resources are low for this instance of safe mode.
-     */
-    private void setResourcesLow() {
-      resourcesLow = true;
-    }
-
-    /**
-     * A tip on how safe mode is to be turned off: manually or automatically.
-     */
-    String getTurnOffTip() {
-      if(!isOn()) {
-        return "Safe mode is OFF.";
-      }
-
-      //Manual OR low-resource safemode. (Admin intervention required)
-      String adminMsg = "It was turned on manually. ";
-      if (areResourcesLow()) {
-        adminMsg = "Resources are low on NN. Please add or free up more "
-          + "resources then turn off safe mode manually. NOTE:  If you turn off"
-          + " safe mode before adding resources, "
-          + "the NN will immediately return to safe mode. ";
-      }
-      if (isManual() || areResourcesLow()) {
-        return adminMsg
-          + "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off.";
-      }
-
-      boolean thresholdsMet = true;
-      int numLive = getNumLiveDataNodes();
-      String msg = "";
-      if (blockSafe < blockThreshold) {
-        msg += String.format(
-          "The reported blocks %d needs additional %d"
-          + " blocks to reach the threshold %.4f of total blocks %d.%n",
-                blockSafe, (blockThreshold - blockSafe), threshold, blockTotal);
-        thresholdsMet = false;
-      } else {
-        msg += String.format("The reported blocks %d has reached the threshold"
-            + " %.4f of total blocks %d. ", blockSafe, threshold, blockTotal);
-      }
-      if (numLive < datanodeThreshold) {
-        msg += String.format(
-          "The number of live datanodes %d needs an additional %d live "
-          + "datanodes to reach the minimum number %d.%n",
-          numLive, (datanodeThreshold - numLive), datanodeThreshold);
-        thresholdsMet = false;
-      } else {
-        msg += String.format("The number of live datanodes %d has reached "
-            + "the minimum number %d. ",
-            numLive, datanodeThreshold);
-      }
-
-      if(blockManager.getBytesInFuture() > 0) {
-        msg += "Name node detected blocks with generation stamps " +
-            "in future. This means that Name node metadata is inconsistent." +
-            "This can happen if Name node metadata files have been manually " +
-            "replaced. Exiting safe mode will cause loss of " + blockManager
-            .getBytesInFuture() + " byte(s). Please restart name node with " +
-            "right metadata or use \"hdfs dfsadmin -safemode forceExit" +
-            "if you are certain that the NameNode was started with the" +
-            "correct FsImage and edit logs. If you encountered this during" +
-            "a rollback, it is safe to exit with -safemode forceExit.";
-        return msg;
-      }
-
-
-      msg += (reached > 0) ? "In safe mode extension. " : "";
-      msg += "Safe mode will be turned off automatically ";
-
-      if (!thresholdsMet) {
-        msg += "once the thresholds have been reached.";
-      } else if (reached + extension - monotonicNow() > 0) {
-        msg += ("in " + (reached + extension - monotonicNow()) / 1000 + " seconds.");
-      } else {
-        msg += "soon.";
-      }
-
-      return msg;
-    }
-
-    /**
-     * Print status every 20 seconds.
-     */
-    private void reportStatus(String msg, boolean rightNow) {
-      long curTime = now();
-      if(!rightNow && (curTime - lastStatusReport < 20 * 1000))
-        return;
-      NameNode.stateChangeLog.info(msg + " \n" + getTurnOffTip());
-      lastStatusReport = curTime;
-    }
-
-    @Override
-    public String toString() {
-      String resText = "Current safe blocks = " 
-        + blockSafe 
-        + ". Target blocks = " + blockThreshold + " for threshold = %" + threshold
-        + ". Minimal replication = " + safeReplication + ".";
-      if (reached > 0) 
-        resText += " Threshold was reached " + new Date(reachedTimestamp) + ".";
-      return resText;
-    }
-      
-    /**
-     * Checks consistency of the class state.
-     * This is costly so only runs if asserts are enabled.
-     */
-    private void doConsistencyCheck() {
-      boolean assertsOn = false;
-      assert assertsOn = true; // set to true if asserts are on
-      if (!assertsOn) return;
-      
-      if (blockTotal == -1 && blockSafe == -1) {
-        return; // manual safe mode
-      }
-      int activeBlocks = blockManager.getActiveBlockCount();
-      if ((blockTotal != activeBlocks) &&
-          !(blockSafe >= 0 && blockSafe <= blockTotal)) {
-        throw new AssertionError(
-            " SafeMode: Inconsistent filesystem state: "
-        + "SafeMode data: blockTotal=" + blockTotal
-        + " blockSafe=" + blockSafe + "; "
-        + "BlockManager data: active="  + activeBlocks);
-      }
-    }
-
-    private synchronized void adjustBlockTotals(int deltaSafe, int deltaTotal) {
-      if (!shouldIncrementallyTrackBlocks) {
-        return;
-      }
-      assert haEnabled;
-      
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adjusting block totals from " +
-            blockSafe + "/" + blockTotal + " to " +
-            (blockSafe + deltaSafe) + "/" + (blockTotal + deltaTotal));
-      }
-      assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " +
-        blockSafe + " by " + deltaSafe + ": would be negative";
-      assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " +
-        blockTotal + " by " + deltaTotal + ": would be negative";
-      
-      blockSafe += deltaSafe;
-      setBlockTotal(blockTotal + deltaTotal);
-    }
-  }
-    
-  /**
-   * Periodically check whether it is time to leave safe mode.
-   * This thread starts when the threshold level is reached.
-   *
-   */
-  class SafeModeMonitor implements Runnable {
-    /** interval in msec for checking safe mode: {@value} */
-    private static final long recheckInterval = 1000;
-      
-    /**
-     */
-    @Override
-    public void run() {
-      while (fsRunning) {
-        writeLock();
-        try {
-          if (safeMode == null) { // Not in safe mode.
-            break;
-          }
-          if (safeMode.canLeave()) {
-            // Leave safe mode.
-            safeMode.leave(false);
-            smmthread = null;
-            break;
-          }
-        } finally {
-          writeUnlock();
-        }
-
-        try {
-          Thread.sleep(recheckInterval);
-        } catch (InterruptedException ie) {
-          // Ignored
-        }
-      }
-      if (!fsRunning) {
-        LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread");
-      }
-    }
-  }
-    
   boolean setSafeMode(SafeModeAction action) throws IOException {
     if (action != SafeModeAction.SAFEMODE_GET) {
       checkSuperuserPrivilege();
@@ -4722,9 +4085,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
               .getBytesInFuture() + " byte(s). Please use " +
               "-forceExit flag to exit safe mode forcefully and data loss is " +
               "acceptable.");
-          return isInSafeMode();
+        } else {
+          leaveSafeMode();
         }
-        leaveSafeMode();
         break;
       case SAFEMODE_ENTER: // enter safe mode
         enterSafeMode(false);
@@ -4733,7 +4096,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         if (blockManager.getBytesInFuture() > 0) {
           LOG.warn("Leaving safe mode due to forceExit. This will cause a data "
               + "loss of " + blockManager.getBytesInFuture() + " byte(s).");
-          safeMode.leave(true);
           blockManager.clearBytesInFuture();
         } else {
           LOG.warn("forceExit used when normal exist would suffice. Treating " +
@@ -4748,85 +4110,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return isInSafeMode();
   }
 
-  @Override
-  public void checkSafeMode() {
-    // safeMode is volatile, and may be set to null at any time
-    SafeModeInfo safeMode = this.safeMode;
-    if (safeMode != null) {
-      safeMode.checkMode();
-    }
-  }
-
-  @Override
-  public boolean isInSafeMode() {
-    // safeMode is volatile, and may be set to null at any time
-    SafeModeInfo safeMode = this.safeMode;
-    if (safeMode == null)
-      return false;
-    return safeMode.isOn();
-  }
-
-  @Override
-  public boolean isInStartupSafeMode() {
-    // safeMode is volatile, and may be set to null at any time
-    SafeModeInfo safeMode = this.safeMode;
-    if (safeMode == null)
-      return false;
-    // If the NN is in safemode, and not due to manual / low resources, we
-    // assume it must be because of startup. If the NN had low resources during
-    // startup, we assume it came out of startup safemode and it is now in low
-    // resources safemode
-    return !safeMode.isManual() && !safeMode.areResourcesLow()
-      && safeMode.isOn();
-  }
-
-  @Override
-  public void incrementSafeBlockCount(int storageNum, BlockInfo storedBlock) {
-    // safeMode is volatile, and may be set to null at any time
-    SafeModeInfo safeMode = this.safeMode;
-    if (safeMode == null)
-      return;
-    safeMode.incrementSafeBlockCount((short) storageNum, storedBlock);
-  }
-
-  @Override
-  public void decrementSafeBlockCount(BlockInfo b) {
-    // safeMode is volatile, and may be set to null at any time
-    SafeModeInfo safeMode = this.safeMode;
-    if (safeMode == null) // mostly true
-      return;
-    BlockInfo storedBlock = getStoredBlock(b);
-    if (storedBlock.isComplete()) {
-      safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
-    }
-  }
-  
-  /**
-   * Adjust the total number of blocks safe and expected during safe mode.
-   * If safe mode is not currently on, this is a no-op.
-   * @param deltaSafe the change in number of safe blocks
-   * @param deltaTotal the change i nnumber of total blocks expected
-   */
-  @Override
-  public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) {
-    // safeMode is volatile, and may be set to null at any time
-    SafeModeInfo safeMode = this.safeMode;
-    if (safeMode == null)
-      return;
-    safeMode.adjustBlockTotals(deltaSafe, deltaTotal);
-  }
-
-  /**
-   * Set the total number of blocks in the system. 
-   */
-  public void setBlockTotal(long completeBlocksTotal) {
-    // safeMode is volatile, and may be set to null at any time
-    SafeModeInfo safeMode = this.safeMode;
-    if (safeMode == null)
-      return;
-    safeMode.setBlockTotal((int) completeBlocksTotal);
-  }
-
   /**
    * Get the total number of blocks in the system. 
    */
@@ -4870,6 +4153,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
+
+  @Override
+  public boolean isInSafeMode() {
+    return isInManualOrResourceLowSafeMode() || blockManager.isInSafeMode();
+  }
+
+  @Override
+  public boolean isInStartupSafeMode() {
+    return !isInManualOrResourceLowSafeMode() && blockManager.isInSafeMode();
+  }
+
   /**
    * Enter safe mode. If resourcesLow is false, then we assume it is manual
    * @throws IOException
@@ -4890,20 +4184,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (isEditlogOpenForWrite) {
         getEditLog().logSyncAll();
       }
-      if (!isInSafeMode()) {
-        safeMode = new SafeModeInfo(resourcesLow);
-        return;
-      }
-      if (resourcesLow) {
-        safeMode.setResourcesLow();
-      } else {
-        safeMode.setManual();
-      }
+      setManualAndResourceLowSafeMode(!resourcesLow, resourcesLow);
+      NameNode.stateChangeLog.info("STATE* Safe mode is ON.\n" +
+          getSafeModeTip());
       if (isEditlogOpenForWrite) {
         getEditLog().logSyncAll();
       }
-      NameNode.stateChangeLog.info("STATE* Safe mode is ON"
-          + safeMode.getTurnOffTip());
+      NameNode.stateChangeLog.info("STATE* Safe mode is ON" + getSafeModeTip());
     } finally {
       writeUnlock();
     }
@@ -4919,29 +4206,40 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         NameNode.stateChangeLog.info("STATE* Safe mode is already OFF"); 
         return;
       }
-      safeMode.leave(false);
+      setManualAndResourceLowSafeMode(false, false);
+      blockManager.leaveSafeMode(true);
     } finally {
       writeUnlock();
     }
   }
-    
+
   String getSafeModeTip() {
-    // There is no need to take readLock.
-    // Don't use isInSafeMode as this.safeMode might be set to null.
-    // after isInSafeMode returns.
-    boolean inSafeMode;
-    SafeModeInfo safeMode = this.safeMode;
-    if (safeMode == null) {
-      inSafeMode = false;
-    } else {
-      inSafeMode = safeMode.isOn();
+    String cmd = "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off.";
+    synchronized (this) {
+      if (resourceLowSafeMode) {
+        return "Resources are low on NN. Please add or free up more resources"
+            + "then turn off safe mode manually. NOTE:  If you turn off safe "
+            + "mode before adding resources, the NN will immediately return to "
+            + "safe mode. " + cmd;
+      } else if (manualSafeMode) {
+        return "It was turned on manually. " + cmd;
+      }
     }
 
-    if (!inSafeMode) {
-      return "";
-    } else {
-      return safeMode.getTurnOffTip();
-    }
+    return blockManager.getSafeModeTip();
+  }
+
+  /**
+   * @return true iff it is in manual safe mode or resource low safe mode.
+   */
+  private synchronized boolean isInManualOrResourceLowSafeMode() {
+    return manualSafeMode || resourceLowSafeMode;
+  }
+
+  private synchronized void setManualAndResourceLowSafeMode(boolean manual,
+      boolean resourceLow) {
+    this.manualSafeMode = manual;
+    this.resourceLowSafeMode = resourceLow;
   }
 
   CheckpointSignature rollEditLog() throws IOException {
@@ -6428,11 +5726,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   public ReentrantLock getCpLockForTests() {
     return cpLock;
   }
-
-  @VisibleForTesting
-  public SafeModeInfo getSafeModeInfoForTests() {
-    return safeMode;
-  }
   
   @VisibleForTesting
   public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
@@ -7559,11 +6852,5 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return blockManager.getBytesInFuture();
   }
 
-  @VisibleForTesting
-  synchronized void enableSafeModeForTesting(Configuration conf) {
-    SafeModeInfo newSafemode = new SafeModeInfo(conf);
-    newSafemode.enter();
-    this.safeMode = newSafemode;
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 7371d84..c3f3017 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -383,7 +383,7 @@ public class NameNode implements NameNodeStatusMXBean {
     return rpcServer;
   }
   
-  static void initMetrics(Configuration conf, NamenodeRole role) {
+  public static void initMetrics(Configuration conf, NamenodeRole role) {
     metrics = NameNodeMetrics.create(conf, role);
   }
 
@@ -1682,11 +1682,9 @@ public class NameNode implements NameNodeStatusMXBean {
     HAServiceState retState = state.getServiceState();
     HAServiceStatus ret = new HAServiceStatus(retState);
     if (retState == HAServiceState.STANDBY) {
-      String safemodeTip = namesystem.getSafeModeTip();
-      if (!safemodeTip.isEmpty()) {
-        ret.setNotReadyToBecomeActive(
-            "The NameNode is in safemode. " +
-            safemodeTip);
+      if (namesystem.isInSafeMode()) {
+        ret.setNotReadyToBecomeActive("The NameNode is in safemode. " +
+            namesystem.getSafeModeTip());
       } else {
         ret.setReadyToBecomeActive();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index b1012c2..59ad092 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -48,10 +48,10 @@ public interface Namesystem extends RwLock, SafeMode {
 
   BlockCollection getBlockCollection(long id);
 
-  void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
-
   void checkOperation(OperationCategory read) throws StandbyException;
 
+  void startSecretManagerIfNecessary();
+
   /**
    * Gets the erasure coding policy for the path
    * @param src
@@ -67,4 +67,15 @@ public interface Namesystem extends RwLock, SafeMode {
   CacheManager getCacheManager();
 
   HAContext getHAContext();
+
+  /**
+   * @return true if the HA is enabled else false
+   */
+  boolean isHaEnabled();
+
+  /**
+   * @return Whether the namenode is transitioning to active state and is in the
+   *         middle of the starting active services.
+   */
+  boolean inTransitionToActive();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
index 98deed2..9eb5796 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
@@ -18,18 +18,10 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 
 /** SafeMode related operations. */
 @InterfaceAudience.Private
 public interface SafeMode {
-  /**
-   * Check safe mode conditions.
-   * If the corresponding conditions are satisfied,
-   * trigger the system to enter/leave safe mode.
-   */
-  public void checkSafeMode();
-
   /** Is the system in safe mode? */
   public boolean isInSafeMode();
 
@@ -38,14 +30,4 @@ public interface SafeMode {
    * safe mode turned on automatically?
    */
   public boolean isInStartupSafeMode();
-
-  /**
-   * Increment number of blocks that reached minimal replication.
-   * @param replication current replication
-   * @param storedBlock current stored Block
-   */
-  public void incrementSafeBlockCount(int replication, BlockInfo storedBlock);
-
-  /** Decrement number of blocks that reached minimal replication. */
-  public void decrementSafeBlockCount(BlockInfo b);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a49cc74b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
index 6b4e46a..c26fc75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
@@ -219,7 +219,7 @@ public class TestSafeMode {
       }
     }, 10, 10000);
 
-    final int safe = NameNodeAdapter.getSafeModeSafeBlocks(nn);
+    final long safe = NameNodeAdapter.getSafeModeSafeBlocks(nn);
     assertTrue("Expected first block report to make some blocks safe.", safe > 0);
     assertTrue("Did not expect first block report to make all blocks safe.", safe < 15);
 


Mime
View raw message