hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject [04/51] [abbrv] hadoop git commit: HDFS-9388. Decommission related code to support Maintenance State for datanodes.
Date Wed, 09 Aug 2017 17:36:24 GMT
HDFS-9388. Decommission related code to support Maintenance State for datanodes.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/79df1e75
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/79df1e75
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/79df1e75

Branch: refs/heads/HADOOP-13345
Commit: 79df1e750ef558afed6d166ce225a23061b36aed
Parents: 12e44e7
Author: Manoj Govindassamy <manojpec@apache.org>
Authored: Wed Aug 2 14:22:41 2017 -0700
Committer: Manoj Govindassamy <manojpec@apache.org>
Committed: Wed Aug 2 14:22:46 2017 -0700

----------------------------------------------------------------------
 .../blockmanagement/DatanodeAdminManager.java   | 756 +++++++++++++++++++
 .../server/blockmanagement/DatanodeManager.java |  30 +-
 .../blockmanagement/DecommissionManager.java    | 741 ------------------
 .../hadoop/hdfs/server/namenode/BackupNode.java |   2 +-
 .../src/main/resources/hdfs-default.xml         |  21 +-
 .../apache/hadoop/hdfs/TestDecommission.java    |  44 +-
 .../blockmanagement/BlockManagerTestUtil.java   |   2 +-
 ...constructStripedBlocksWithRackAwareness.java |   5 +-
 .../TestReplicationPolicyConsiderLoad.java      |   2 +-
 .../namenode/TestDecommissioningStatus.java     |   6 +-
 .../TestDefaultBlockPlacementPolicy.java        |   4 +-
 .../hadoop/hdfs/server/namenode/TestFsck.java   |  13 +-
 .../server/namenode/TestNameNodeMXBean.java     |   6 +-
 .../namenode/TestNamenodeCapacityReport.java    |   8 +-
 14 files changed, 833 insertions(+), 807 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
new file mode 100644
index 0000000..928036a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
@@ -0,0 +1,756 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.util.AbstractList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.namenode.INodeId;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.util.CyclicIteration;
+import org.apache.hadoop.util.ChunkedArrayList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Manages decommissioning and maintenance state for DataNodes. A background
+ * monitor thread periodically checks the status of DataNodes that are
+ * decommissioning or entering maintenance state.
+ * <p/>
+ * A DataNode can be decommissioned in a few situations:
+ * <ul>
+ * <li>If a DN is dead, it is decommissioned immediately.</li>
+ * <li>If a DN is alive, it is decommissioned after all of its blocks
+ * are sufficiently replicated. Merely under-replicated blocks do not
+ * block decommissioning as long as they are above a replication
+ * threshold.</li>
+ * </ul>
+ * In the second case, the DataNode transitions to a DECOMMISSION_INPROGRESS
+ * state and is tracked by the monitor thread. The monitor periodically scans
+ * through the list of insufficiently replicated blocks on these DataNodes to
+ * determine if they can be DECOMMISSIONED. The monitor also prunes this list
+ * as blocks become replicated, so monitor scans will become more efficient
+ * over time.
+ * <p/>
+ * DECOMMISSION_INPROGRESS nodes that become dead do not progress to
+ * DECOMMISSIONED until they become live again. This prevents potential
+ * durability loss for singly-replicated blocks (see HDFS-6791).
+ * <p/>
+ * DataNodes can also be put under maintenance state for any short duration
+ * maintenance operations. Unlike decommissioning, blocks are not always
+ * re-replicated for the DataNodes to enter maintenance state. When the
+ * blocks are replicated at least dfs.namenode.maintenance.replication.min,
+ * DataNodes transition to IN_MAINTENANCE state. Otherwise, just like
+ * decommissioning, DataNodes transition to ENTERING_MAINTENANCE state and
+ * wait for the blocks to be sufficiently replicated and then transition to
+ * IN_MAINTENANCE state. The block replication factor is relaxed for a maximum
+ * of maintenance expiry time. When DataNodes don't transition or join the
+ * cluster back by expiry time, blocks are re-replicated just as in
+ * decommissioning case as to avoid read or write performance degradation.
+ * <p/>
+ * This class depends on the FSNamesystem lock for synchronization.
+ */
+@InterfaceAudience.Private
+public class DatanodeAdminManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DatanodeAdminManager.class);
+  private final Namesystem namesystem;
+  private final BlockManager blockManager;
+  private final HeartbeatManager hbManager;
+  private final ScheduledExecutorService executor;
+
+  /**
+   * Map containing the DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE
+   * datanodes that are being tracked so they can be be marked as
+   * DECOMMISSIONED or IN_MAINTENANCE. Even after the node is marked as
+   * IN_MAINTENANCE, the node remains in the map until
+   * maintenance expires checked during a monitor tick.
+   * <p/>
+   * This holds a set of references to the under-replicated blocks on the DN at
+   * the time the DN is added to the map, i.e. the blocks that are preventing
+   * the node from being marked as decommissioned. During a monitor tick, this
+   * list is pruned as blocks becomes replicated.
+   * <p/>
+   * Note also that the reference to the list of under-replicated blocks
+   * will be null on initial add
+   * <p/>
+   * However, this map can become out-of-date since it is not updated by block
+   * reports or other events. Before being finally marking as decommissioned,
+   * another check is done with the actual block map.
+   */
+  private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfo>>
+      outOfServiceNodeBlocks;
+
+  /**
+   * Tracking a node in outOfServiceNodeBlocks consumes additional memory. To
+   * limit the impact on NN memory consumption, we limit the number of nodes in
+   * outOfServiceNodeBlocks. Additional nodes wait in pendingNodes.
+   */
+  private final Queue<DatanodeDescriptor> pendingNodes;
+  private Monitor monitor = null;
+
+  DatanodeAdminManager(final Namesystem namesystem,
+      final BlockManager blockManager, final HeartbeatManager hbManager) {
+    this.namesystem = namesystem;
+    this.blockManager = blockManager;
+    this.hbManager = hbManager;
+
+    executor = Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setNameFormat("DatanodeAdminMonitor-%d")
+            .setDaemon(true).build());
+    outOfServiceNodeBlocks = new TreeMap<>();
+    pendingNodes = new LinkedList<>();
+  }
+
+  /**
+   * Start the DataNode admin monitor thread.
+   * @param conf
+   */
+  void activate(Configuration conf) {
+    final int intervalSecs = (int) conf.getTimeDuration(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT,
+        TimeUnit.SECONDS);
+    checkArgument(intervalSecs >= 0, "Cannot set a negative " +
+        "value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);
+
+    int blocksPerInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT);
+
+    final String deprecatedKey =
+        "dfs.namenode.decommission.nodes.per.interval";
+    final String strNodes = conf.get(deprecatedKey);
+    if (strNodes != null) {
+      LOG.warn("Deprecated configuration key {} will be ignored.",
+          deprecatedKey);
+      LOG.warn("Please update your configuration to use {} instead.",
+          DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
+    }
+
+    checkArgument(blocksPerInterval > 0,
+        "Must set a positive value for "
+        + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
+
+    final int maxConcurrentTrackedNodes = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
+        DFSConfigKeys
+            .DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT);
+    checkArgument(maxConcurrentTrackedNodes >= 0, "Cannot set a negative " +
+        "value for "
+        + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
+
+    monitor = new Monitor(blocksPerInterval, maxConcurrentTrackedNodes);
+    executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
+        TimeUnit.SECONDS);
+
+    LOG.debug("Activating DatanodeAdminManager with interval {} seconds, " +
+            "{} max blocks per interval, " +
+            "{} max concurrently tracked nodes.", intervalSecs,
+        blocksPerInterval, maxConcurrentTrackedNodes);
+  }
+
+  /**
+   * Stop the admin monitor thread, waiting briefly for it to terminate.
+   */
+  void close() {
+    executor.shutdownNow();
+    try {
+      executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {}
+  }
+
+  /**
+   * Start decommissioning the specified datanode.
+   * @param node
+   */
+  @VisibleForTesting
+  public void startDecommission(DatanodeDescriptor node) {
+    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+      // Update DN stats maintained by HeartbeatManager
+      hbManager.startDecommission(node);
+      // hbManager.startDecommission will set dead node to decommissioned.
+      if (node.isDecommissionInProgress()) {
+        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+          LOG.info("Starting decommission of {} {} with {} blocks",
+              node, storage, storage.numBlocks());
+        }
+        node.getLeavingServiceStatus().setStartTime(monotonicNow());
+        pendingNodes.add(node);
+      }
+    } else {
+      LOG.trace("startDecommission: Node {} in {}, nothing to do." +
+          node, node.getAdminState());
+    }
+  }
+
+  /**
+   * Stop decommissioning the specified datanode.
+   * @param node
+   */
+  @VisibleForTesting
+  public void stopDecommission(DatanodeDescriptor node) {
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      // Update DN stats maintained by HeartbeatManager
+      hbManager.stopDecommission(node);
+      // extra redundancy blocks will be detected and processed when
+      // the dead node comes back and send in its full block report.
+      if (node.isAlive()) {
+        blockManager.processExtraRedundancyBlocksOnInService(node);
+      }
+      // Remove from tracking in DatanodeAdminManager
+      pendingNodes.remove(node);
+      outOfServiceNodeBlocks.remove(node);
+    } else {
+      LOG.trace("stopDecommission: Node {} in {}, nothing to do." +
+          node, node.getAdminState());
+    }
+  }
+
+  /**
+   * Start maintenance of the specified datanode.
+   * @param node
+   */
+  @VisibleForTesting
+  public void startMaintenance(DatanodeDescriptor node,
+      long maintenanceExpireTimeInMS) {
+    // Even if the node is already in maintenance, we still need to adjust
+    // the expiration time.
+    node.setMaintenanceExpireTimeInMS(maintenanceExpireTimeInMS);
+    if (!node.isMaintenance()) {
+      // Update DN stats maintained by HeartbeatManager
+      hbManager.startMaintenance(node);
+      // hbManager.startMaintenance will set dead node to IN_MAINTENANCE.
+      if (node.isEnteringMaintenance()) {
+        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+          LOG.info("Starting maintenance of {} {} with {} blocks",
+              node, storage, storage.numBlocks());
+        }
+        node.getLeavingServiceStatus().setStartTime(monotonicNow());
+      }
+      // Track the node regardless whether it is ENTERING_MAINTENANCE or
+      // IN_MAINTENANCE to support maintenance expiration.
+      pendingNodes.add(node);
+    } else {
+      LOG.trace("startMaintenance: Node {} in {}, nothing to do." +
+          node, node.getAdminState());
+    }
+  }
+
+
+  /**
+   * Stop maintenance of the specified datanode.
+   * @param node
+   */
+  @VisibleForTesting
+  public void stopMaintenance(DatanodeDescriptor node) {
+    if (node.isMaintenance()) {
+      // Update DN stats maintained by HeartbeatManager
+      hbManager.stopMaintenance(node);
+
+      // extra redundancy blocks will be detected and processed when
+      // the dead node comes back and send in its full block report.
+      if (!node.isAlive()) {
+        // The node became dead when it was in maintenance, at which point
+        // the replicas weren't removed from block maps.
+        // When the node leaves maintenance, the replicas should be removed
+        // from the block maps to trigger the necessary replication to
+        // maintain the safety property of "# of live replicas + maintenance
+        // replicas" >= the expected redundancy.
+        blockManager.removeBlocksAssociatedTo(node);
+      } else {
+        // Even though putting nodes in maintenance node doesn't cause live
+        // replicas to match expected replication factor, it is still possible
+        // to have over replicated when the node leaves maintenance node.
+        // First scenario:
+        // a. Node became dead when it is at AdminStates.NORMAL, thus
+        //    block is replicated so that 3 replicas exist on other nodes.
+        // b. Admins put the dead node into maintenance mode and then
+        //    have the node rejoin the cluster.
+        // c. Take the node out of maintenance mode.
+        // Second scenario:
+        // a. With replication factor 3, set one replica to maintenance node,
+        //    thus block has 1 maintenance replica and 2 live replicas.
+        // b. Change the replication factor to 2. The block will still have
+        //    1 maintenance replica and 2 live replicas.
+        // c. Take the node out of maintenance mode.
+        blockManager.processExtraRedundancyBlocksOnInService(node);
+      }
+
+      // Remove from tracking in DatanodeAdminManager
+      pendingNodes.remove(node);
+      outOfServiceNodeBlocks.remove(node);
+    } else {
+      LOG.trace("stopMaintenance: Node {} in {}, nothing to do." +
+          node, node.getAdminState());
+    }
+  }
+
+  private void setDecommissioned(DatanodeDescriptor dn) {
+    dn.setDecommissioned();
+    LOG.info("Decommissioning complete for node {}", dn);
+  }
+
+  private void setInMaintenance(DatanodeDescriptor dn) {
+    dn.setInMaintenance();
+    LOG.info("Node {} has entered maintenance mode.", dn);
+  }
+
+  /**
+   * Checks whether a block is sufficiently replicated/stored for
+   * DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE datanodes. For replicated
+   * blocks or striped blocks, full-strength replication or storage is not
+   * always necessary, hence "sufficient".
+   * @return true if sufficient, else false.
+   */
+  private boolean isSufficient(BlockInfo block, BlockCollection bc,
+      NumberReplicas numberReplicas, boolean isDecommission) {
+    if (blockManager.hasEnoughEffectiveReplicas(block, numberReplicas, 0)) {
+      // Block has enough replica, skip
+      LOG.trace("Block {} does not need replication.", block);
+      return true;
+    }
+
+    final int numExpected = blockManager.getExpectedLiveRedundancyNum(block,
+        numberReplicas);
+    final int numLive = numberReplicas.liveReplicas();
+
+    // Block is under-replicated
+    LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
+        numLive);
+    if (isDecommission && numExpected > numLive) {
+      if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
+        // Can decom a UC block as long as there will still be minReplicas
+        if (blockManager.hasMinStorage(block, numLive)) {
+          LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
+              + ">= minR ({})", block, numLive,
+              blockManager.getMinStorageNum(block));
+          return true;
+        } else {
+          LOG.trace("UC block {} insufficiently-replicated since numLive "
+              + "({}) < minR ({})", block, numLive,
+              blockManager.getMinStorageNum(block));
+        }
+      } else {
+        // Can decom a non-UC as long as the default replication is met
+        if (numLive >= blockManager.getDefaultStorageNum(block)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private void logBlockReplicationInfo(BlockInfo block,
+      BlockCollection bc,
+      DatanodeDescriptor srcNode, NumberReplicas num,
+      Iterable<DatanodeStorageInfo> storages) {
+    if (!NameNode.blockStateChangeLog.isInfoEnabled()) {
+      return;
+    }
+
+    int curReplicas = num.liveReplicas();
+    int curExpectedRedundancy = blockManager.getExpectedRedundancyNum(block);
+    StringBuilder nodeList = new StringBuilder();
+    for (DatanodeStorageInfo storage : storages) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
+      nodeList.append(node);
+      nodeList.append(" ");
+    }
+    NameNode.blockStateChangeLog.info(
+        "Block: " + block + ", Expected Replicas: "
+        + curExpectedRedundancy + ", live replicas: " + curReplicas
+        + ", corrupt replicas: " + num.corruptReplicas()
+        + ", decommissioned replicas: " + num.decommissioned()
+        + ", decommissioning replicas: " + num.decommissioning()
+        + ", maintenance replicas: " + num.maintenanceReplicas()
+        + ", live entering maintenance replicas: "
+        + num.liveEnteringMaintenanceReplicas()
+        + ", excess replicas: " + num.excessReplicas()
+        + ", Is Open File: " + bc.isUnderConstruction()
+        + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+        + srcNode + ", Is current datanode decommissioning: "
+        + srcNode.isDecommissionInProgress() +
+        ", Is current datanode entering maintenance: "
+        + srcNode.isEnteringMaintenance());
+  }
+
+  @VisibleForTesting
+  public int getNumPendingNodes() {
+    return pendingNodes.size();
+  }
+
+  @VisibleForTesting
+  public int getNumTrackedNodes() {
+    return outOfServiceNodeBlocks.size();
+  }
+
+  @VisibleForTesting
+  public int getNumNodesChecked() {
+    return monitor.numNodesChecked;
+  }
+
+  /**
+   * Checks to see if datanodes have finished DECOMMISSION_INPROGRESS or
+   * ENTERING_MAINTENANCE state.
+   * <p/>
+   * Since this is done while holding the namesystem lock,
+   * the amount of work per monitor tick is limited.
+   */
+  private class Monitor implements Runnable {
+    /**
+     * The maximum number of blocks to check per tick.
+     */
+    private final int numBlocksPerCheck;
+    /**
+     * The maximum number of nodes to track in outOfServiceNodeBlocks.
+     * A value of 0 means no limit.
+     */
+    private final int maxConcurrentTrackedNodes;
+    /**
+     * The number of blocks that have been checked on this tick.
+     */
+    private int numBlocksChecked = 0;
+    /**
+     * The number of blocks checked after (re)holding lock.
+     */
+    private int numBlocksCheckedPerLock = 0;
+    /**
+     * The number of nodes that have been checked on this tick. Used for
+     * statistics.
+     */
+    private int numNodesChecked = 0;
+    /**
+     * The last datanode in outOfServiceNodeBlocks that we've processed.
+     */
+    private DatanodeDescriptor iterkey = new DatanodeDescriptor(
+        new DatanodeID("", "", "", 0, 0, 0, 0));
+
+    Monitor(int numBlocksPerCheck, int maxConcurrentTrackedNodes) {
+      this.numBlocksPerCheck = numBlocksPerCheck;
+      this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
+    }
+
+    private boolean exceededNumBlocksPerCheck() {
+      LOG.trace("Processed {} blocks so far this tick", numBlocksChecked);
+      return numBlocksChecked >= numBlocksPerCheck;
+    }
+
+    @Override
+    public void run() {
+      if (!namesystem.isRunning()) {
+        LOG.info("Namesystem is not running, skipping " +
+            "decommissioning/maintenance checks.");
+        return;
+      }
+      // Reset the checked count at beginning of each iteration
+      numBlocksChecked = 0;
+      numBlocksCheckedPerLock = 0;
+      numNodesChecked = 0;
+      // Check decommission or maintenance progress.
+      namesystem.writeLock();
+      try {
+        processPendingNodes();
+        check();
+      } finally {
+        namesystem.writeUnlock();
+      }
+      if (numBlocksChecked + numNodesChecked > 0) {
+        LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked,
+            numNodesChecked);
+      }
+    }
+
+    /**
+     * Pop datanodes off the pending list and into decomNodeBlocks,
+     * subject to the maxConcurrentTrackedNodes limit.
+     */
+    private void processPendingNodes() {
+      while (!pendingNodes.isEmpty() &&
+          (maxConcurrentTrackedNodes == 0 ||
+          outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
+        outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
+      }
+    }
+
+    private void check() {
+      final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>>
+          it = new CyclicIteration<>(outOfServiceNodeBlocks,
+              iterkey).iterator();
+      final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
+
+      while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
+          .isRunning()) {
+        numNodesChecked++;
+        final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
+            entry = it.next();
+        final DatanodeDescriptor dn = entry.getKey();
+        AbstractList<BlockInfo> blocks = entry.getValue();
+        boolean fullScan = false;
+        if (dn.isMaintenance() && dn.maintenanceExpired()) {
+          // If maintenance expires, stop tracking it.
+          stopMaintenance(dn);
+          toRemove.add(dn);
+          continue;
+        }
+        if (dn.isInMaintenance()) {
+          // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet.
+          continue;
+        }
+        if (blocks == null) {
+          // This is a newly added datanode, run through its list to schedule
+          // under-replicated blocks for replication and collect the blocks
+          // that are insufficiently replicated for further tracking
+          LOG.debug("Newly-added node {}, doing full scan to find " +
+              "insufficiently-replicated blocks.", dn);
+          blocks = handleInsufficientlyStored(dn);
+          outOfServiceNodeBlocks.put(dn, blocks);
+          fullScan = true;
+        } else {
+          // This is a known datanode, check if its # of insufficiently
+          // replicated blocks has dropped to zero and if it can move
+          // to the next state.
+          LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
+          pruneReliableBlocks(dn, blocks);
+        }
+        if (blocks.size() == 0) {
+          if (!fullScan) {
+            // If we didn't just do a full scan, need to re-check with the
+            // full block map.
+            //
+            // We've replicated all the known insufficiently replicated
+            // blocks. Re-check with the full block map before finally
+            // marking the datanode as DECOMMISSIONED or IN_MAINTENANCE.
+            LOG.debug("Node {} has finished replicating current set of "
+                + "blocks, checking with the full block map.", dn);
+            blocks = handleInsufficientlyStored(dn);
+            outOfServiceNodeBlocks.put(dn, blocks);
+          }
+          // If the full scan is clean AND the node liveness is okay,
+          // we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
+          final boolean isHealthy =
+              blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
+          if (blocks.size() == 0 && isHealthy) {
+            if (dn.isDecommissionInProgress()) {
+              setDecommissioned(dn);
+              toRemove.add(dn);
+            } else if (dn.isEnteringMaintenance()) {
+              // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
+              // to track maintenance expiration.
+              setInMaintenance(dn);
+            } else {
+              Preconditions.checkState(false,
+                  "A node is in an invalid state!");
+            }
+            LOG.debug("Node {} is sufficiently replicated and healthy, "
+                + "marked as {}.", dn.getAdminState());
+          } else {
+            LOG.debug("Node {} {} healthy."
+                + " It needs to replicate {} more blocks."
+                + " {} is still in progress.", dn,
+                isHealthy? "is": "isn't", blocks.size(), dn.getAdminState());
+          }
+        } else {
+          LOG.debug("Node {} still has {} blocks to replicate "
+              + "before it is a candidate to finish {}.",
+              dn, blocks.size(), dn.getAdminState());
+        }
+        iterkey = dn;
+      }
+      // Remove the datanodes that are DECOMMISSIONED or in service after
+      // maintenance expiration.
+      for (DatanodeDescriptor dn : toRemove) {
+        Preconditions.checkState(dn.isDecommissioned() || dn.isInService(),
+            "Removing a node that is not yet decommissioned or in service!");
+        outOfServiceNodeBlocks.remove(dn);
+      }
+    }
+
+    /**
+     * Removes reliable blocks from the block list of a datanode.
+     */
+    private void pruneReliableBlocks(final DatanodeDescriptor datanode,
+        AbstractList<BlockInfo> blocks) {
+      processBlocksInternal(datanode, blocks.iterator(), null, true);
+    }
+
+    /**
+     * Returns a list of blocks on a datanode that are insufficiently
+     * replicated or require recovery, i.e. requiring recovery and
+     * should prevent decommission or maintenance.
+     * <p/>
+     * As part of this, it also schedules replication/recovery work.
+     *
+     * @return List of blocks requiring recovery
+     */
+    private AbstractList<BlockInfo> handleInsufficientlyStored(
+        final DatanodeDescriptor datanode) {
+      AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>();
+      processBlocksInternal(datanode, datanode.getBlockIterator(),
+          insufficient, false);
+      return insufficient;
+    }
+
+    /**
+     * Used while checking if DECOMMISSION_INPROGRESS datanodes can be
+     * marked as DECOMMISSIONED or ENTERING_MAINTENANCE datanodes can be
+     * marked as IN_MAINTENANCE. Combines shared logic of pruneReliableBlocks
+     * and handleInsufficientlyStored.
+     *
+     * @param datanode                    Datanode
+     * @param it                          Iterator over the blocks on the
+     *                                    datanode
+     * @param insufficientList            Return parameter. If it's not null,
+     *                                    will contain the insufficiently
+     *                                    replicated-blocks from the list.
+     * @param pruneReliableBlocks         whether to remove blocks reliable
+     *                                    enough from the iterator
+     */
+    private void processBlocksInternal(
+        final DatanodeDescriptor datanode,
+        final Iterator<BlockInfo> it,
+        final List<BlockInfo> insufficientList,
+        boolean pruneReliableBlocks) {
+      boolean firstReplicationLog = true;
+      // Low redundancy in UC Blocks only
+      int lowRedundancyInOpenFiles = 0;
+      // All low redundancy blocks. Includes lowRedundancyInOpenFiles.
+      int lowRedundancyBlocks = 0;
+      // All maintenance and decommission replicas.
+      int outOfServiceOnlyReplicas = 0;
+      while (it.hasNext()) {
+        if (insufficientList == null
+            && numBlocksCheckedPerLock >= numBlocksPerCheck) {
+          // During fullscan insufficientlyReplicated will NOT be null, iterator
+          // will be DN's iterator. So should not yield lock, otherwise
+          // ConcurrentModificationException could occur.
+          // Once the fullscan done, iterator will be a copy. So can yield the
+          // lock.
+          // Yielding is required in case of block number is greater than the
+          // configured per-iteration-limit.
+          namesystem.writeUnlock();
+          try {
+            LOG.debug("Yielded lock during decommission/maintenance check");
+            Thread.sleep(0, 500);
+          } catch (InterruptedException ignored) {
+            return;
+          }
+          // reset
+          numBlocksCheckedPerLock = 0;
+          namesystem.writeLock();
+        }
+        numBlocksChecked++;
+        numBlocksCheckedPerLock++;
+        final BlockInfo block = it.next();
+        // Remove the block from the list if it's no longer in the block map,
+        // e.g. the containing file has been deleted
+        if (blockManager.blocksMap.getStoredBlock(block) == null) {
+          LOG.trace("Removing unknown block {}", block);
+          it.remove();
+          continue;
+        }
+
+        long bcId = block.getBlockCollectionId();
+        if (bcId == INodeId.INVALID_INODE_ID) {
+          // Orphan block, will be invalidated eventually. Skip.
+          continue;
+        }
+
+        final BlockCollection bc = blockManager.getBlockCollection(block);
+        final NumberReplicas num = blockManager.countNodes(block);
+        final int liveReplicas = num.liveReplicas();
+
+        // Schedule low redundancy blocks for reconstruction
+        // if not already pending.
+        boolean isDecommission = datanode.isDecommissionInProgress();
+        boolean neededReconstruction = isDecommission ?
+            blockManager.isNeededReconstruction(block, num) :
+            blockManager.isNeededReconstructionForMaintenance(block, num);
+        if (neededReconstruction) {
+          if (!blockManager.neededReconstruction.contains(block) &&
+              blockManager.pendingReconstruction.getNumReplicas(block) == 0 &&
+              blockManager.isPopulatingReplQueues()) {
+            // Process these blocks only when active NN is out of safe mode.
+            blockManager.neededReconstruction.add(block,
+                liveReplicas, num.readOnlyReplicas(),
+                num.outOfServiceReplicas(),
+                blockManager.getExpectedRedundancyNum(block));
+          }
+        }
+
+        // Even if the block is without sufficient redundancy,
+        // it might not block decommission/maintenance if it
+        // has sufficient redundancy.
+        if (isSufficient(block, bc, num, isDecommission)) {
+          if (pruneReliableBlocks) {
+            it.remove();
+          }
+          continue;
+        }
+
+        // We've found a block without sufficient redundancy.
+        if (insufficientList != null) {
+          insufficientList.add(block);
+        }
+        // Log if this is our first time through
+        if (firstReplicationLog) {
+          logBlockReplicationInfo(block, bc, datanode, num,
+              blockManager.blocksMap.getStorages(block));
+          firstReplicationLog = false;
+        }
+        // Update various counts
+        lowRedundancyBlocks++;
+        if (bc.isUnderConstruction()) {
+          lowRedundancyInOpenFiles++;
+        }
+        if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) {
+          outOfServiceOnlyReplicas++;
+        }
+      }
+
+      datanode.getLeavingServiceStatus().set(lowRedundancyInOpenFiles,
+          lowRedundancyBlocks, outOfServiceOnlyReplicas);
+    }
+  }
+
+  @VisibleForTesting
+  void runMonitorForTest() throws ExecutionException, InterruptedException {
+    executor.submit(monitor).get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 2c5779a..d705fec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -75,7 +75,7 @@ public class DatanodeManager {
 
   private final Namesystem namesystem;
   private final BlockManager blockManager;
-  private final DecommissionManager decomManager;
+  private final DatanodeAdminManager datanodeAdminManager;
   private final HeartbeatManager heartbeatManager;
   private final FSClusterStats fsClusterStats;
 
@@ -223,9 +223,10 @@ public class DatanodeManager {
       networktopology = NetworkTopology.getInstance(conf);
     }
 
-    this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
-    this.decomManager = new DecommissionManager(namesystem, blockManager,
-        heartbeatManager);
+    this.heartbeatManager = new HeartbeatManager(namesystem,
+        blockManager, conf);
+    this.datanodeAdminManager = new DatanodeAdminManager(namesystem,
+        blockManager, heartbeatManager);
     this.fsClusterStats = newFSClusterStats();
     this.dataNodePeerStatsEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
@@ -372,12 +373,12 @@ public class DatanodeManager {
   }
   
   void activate(final Configuration conf) {
-    decomManager.activate(conf);
+    datanodeAdminManager.activate(conf);
     heartbeatManager.activate();
   }
 
   void close() {
-    decomManager.close();
+    datanodeAdminManager.close();
     heartbeatManager.close();
   }
 
@@ -392,8 +393,8 @@ public class DatanodeManager {
   }
 
   @VisibleForTesting
-  public DecommissionManager getDecomManager() {
-    return decomManager;
+  public DatanodeAdminManager getDatanodeAdminManager() {
+    return datanodeAdminManager;
   }
 
   public HostConfigManager getHostConfigManager() {
@@ -991,9 +992,9 @@ public class DatanodeManager {
         hostConfigManager.getMaintenanceExpirationTimeInMS(nodeReg);
     // If the registered node is in exclude list, then decommission it
     if (getHostConfigManager().isExcluded(nodeReg)) {
-      decomManager.startDecommission(nodeReg);
+      datanodeAdminManager.startDecommission(nodeReg);
     } else if (nodeReg.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
-      decomManager.startMaintenance(nodeReg, maintenanceExpireTimeInMS);
+      datanodeAdminManager.startMaintenance(nodeReg, maintenanceExpireTimeInMS);
     }
   }
 
@@ -1219,12 +1220,13 @@ public class DatanodeManager {
         long maintenanceExpireTimeInMS =
             hostConfigManager.getMaintenanceExpirationTimeInMS(node);
         if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
-          decomManager.startMaintenance(node, maintenanceExpireTimeInMS);
+          datanodeAdminManager.startMaintenance(
+              node, maintenanceExpireTimeInMS);
         } else if (hostConfigManager.isExcluded(node)) {
-          decomManager.startDecommission(node);
+          datanodeAdminManager.startDecommission(node);
         } else {
-          decomManager.stopMaintenance(node);
-          decomManager.stopDecommission(node);
+          datanodeAdminManager.stopMaintenance(node);
+          datanodeAdminManager.stopDecommission(node);
         }
       }
       node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
deleted file mode 100644
index ae79826..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ /dev/null
@@ -1,741 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import java.util.AbstractList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.TreeMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.server.namenode.INodeId;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.util.CyclicIteration;
-import org.apache.hadoop.util.ChunkedArrayList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Manages datanode decommissioning. A background monitor thread 
- * periodically checks the status of datanodes that are in-progress of 
- * decommissioning.
- * <p/>
- * A datanode can be decommissioned in a few situations:
- * <ul>
- * <li>If a DN is dead, it is decommissioned immediately.</li>
- * <li>If a DN is alive, it is decommissioned after all of its blocks 
- * are sufficiently replicated. Merely under-replicated blocks do not 
- * block decommissioning as long as they are above a replication 
- * threshold.</li>
- * </ul>
- * In the second case, the datanode transitions to a 
- * decommission-in-progress state and is tracked by the monitor thread. The 
- * monitor periodically scans through the list of insufficiently replicated
- * blocks on these datanodes to 
- * determine if they can be decommissioned. The monitor also prunes this list 
- * as blocks become replicated, so monitor scans will become more efficient 
- * over time.
- * <p/>
- * Decommission-in-progress nodes that become dead do not progress to 
- * decommissioned until they become live again. This prevents potential 
- * durability loss for singly-replicated blocks (see HDFS-6791).
- * <p/>
- * This class depends on the FSNamesystem lock for synchronization.
- */
-@InterfaceAudience.Private
-public class DecommissionManager {
-  private static final Logger LOG = LoggerFactory.getLogger(DecommissionManager
-      .class);
-
-  private final Namesystem namesystem;
-  private final BlockManager blockManager;
-  private final HeartbeatManager hbManager;
-  private final ScheduledExecutorService executor;
-
-  /**
-   * Map containing the DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE
-   * datanodes that are being tracked so they can be be marked as
-   * DECOMMISSIONED or IN_MAINTENANCE. Even after the node is marked as
-   * IN_MAINTENANCE, the node remains in the map until
-   * maintenance expires checked during a monitor tick.
-   * <p/>
-   * This holds a set of references to the under-replicated blocks on the DN at
-   * the time the DN is added to the map, i.e. the blocks that are preventing
-   * the node from being marked as decommissioned. During a monitor tick, this
-   * list is pruned as blocks becomes replicated.
-   * <p/>
-   * Note also that the reference to the list of under-replicated blocks 
-   * will be null on initial add
-   * <p/>
-   * However, this map can become out-of-date since it is not updated by block
-   * reports or other events. Before being finally marking as decommissioned,
-   * another check is done with the actual block map.
-   */
-  private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfo>>
-      outOfServiceNodeBlocks;
-
-  /**
-   * Tracking a node in outOfServiceNodeBlocks consumes additional memory. To
-   * limit the impact on NN memory consumption, we limit the number of nodes in
-   * outOfServiceNodeBlocks. Additional nodes wait in pendingNodes.
-   */
-  private final Queue<DatanodeDescriptor> pendingNodes;
-
-  private Monitor monitor = null;
-
-  DecommissionManager(final Namesystem namesystem,
-      final BlockManager blockManager, final HeartbeatManager hbManager) {
-    this.namesystem = namesystem;
-    this.blockManager = blockManager;
-    this.hbManager = hbManager;
-
-    executor = Executors.newScheduledThreadPool(1,
-        new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
-            .setDaemon(true).build());
-    outOfServiceNodeBlocks = new TreeMap<>();
-    pendingNodes = new LinkedList<>();
-  }
-
-  /**
-   * Start the decommission monitor thread.
-   * @param conf
-   */
-  void activate(Configuration conf) {
-    final int intervalSecs = (int) conf.getTimeDuration(
-        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
-        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT,
-        TimeUnit.SECONDS);
-    checkArgument(intervalSecs >= 0, "Cannot set a negative " +
-        "value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);
-
-    int blocksPerInterval = conf.getInt(
-        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
-        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT);
-
-    final String deprecatedKey =
-        "dfs.namenode.decommission.nodes.per.interval";
-    final String strNodes = conf.get(deprecatedKey);
-    if (strNodes != null) {
-      LOG.warn("Deprecated configuration key {} will be ignored.",
-          deprecatedKey);
-      LOG.warn("Please update your configuration to use {} instead.", 
-          DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
-    }
-
-    checkArgument(blocksPerInterval > 0,
-        "Must set a positive value for "
-        + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
-
-    final int maxConcurrentTrackedNodes = conf.getInt(
-        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
-        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT);
-    checkArgument(maxConcurrentTrackedNodes >= 0, "Cannot set a negative " +
-        "value for "
-        + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
-
-    monitor = new Monitor(blocksPerInterval, maxConcurrentTrackedNodes);
-    executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
-        TimeUnit.SECONDS);
-
-    LOG.debug("Activating DecommissionManager with interval {} seconds, " +
-            "{} max blocks per interval, " +
-            "{} max concurrently tracked nodes.", intervalSecs,
-        blocksPerInterval, maxConcurrentTrackedNodes);
-  }
-
-  /**
-   * Stop the decommission monitor thread, waiting briefly for it to terminate.
-   */
-  void close() {
-    executor.shutdownNow();
-    try {
-      executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {}
-  }
-
-  /**
-   * Start decommissioning the specified datanode. 
-   * @param node
-   */
-  @VisibleForTesting
-  public void startDecommission(DatanodeDescriptor node) {
-    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-      // Update DN stats maintained by HeartbeatManager
-      hbManager.startDecommission(node);
-      // hbManager.startDecommission will set dead node to decommissioned.
-      if (node.isDecommissionInProgress()) {
-        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
-          LOG.info("Starting decommission of {} {} with {} blocks",
-              node, storage, storage.numBlocks());
-        }
-        node.getLeavingServiceStatus().setStartTime(monotonicNow());
-        pendingNodes.add(node);
-      }
-    } else {
-      LOG.trace("startDecommission: Node {} in {}, nothing to do." +
-          node, node.getAdminState());
-    }
-  }
-
-  /**
-   * Stop decommissioning the specified datanode. 
-   * @param node
-   */
-  @VisibleForTesting
-  public void stopDecommission(DatanodeDescriptor node) {
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      // Update DN stats maintained by HeartbeatManager
-      hbManager.stopDecommission(node);
-      // extra redundancy blocks will be detected and processed when
-      // the dead node comes back and send in its full block report.
-      if (node.isAlive()) {
-        blockManager.processExtraRedundancyBlocksOnInService(node);
-      }
-      // Remove from tracking in DecommissionManager
-      pendingNodes.remove(node);
-      outOfServiceNodeBlocks.remove(node);
-    } else {
-      LOG.trace("stopDecommission: Node {} in {}, nothing to do." +
-          node, node.getAdminState());
-    }
-  }
-
-  /**
-   * Start maintenance of the specified datanode.
-   * @param node
-   */
-  @VisibleForTesting
-  public void startMaintenance(DatanodeDescriptor node,
-      long maintenanceExpireTimeInMS) {
-    // Even if the node is already in maintenance, we still need to adjust
-    // the expiration time.
-    node.setMaintenanceExpireTimeInMS(maintenanceExpireTimeInMS);
-    if (!node.isMaintenance()) {
-      // Update DN stats maintained by HeartbeatManager
-      hbManager.startMaintenance(node);
-      // hbManager.startMaintenance will set dead node to IN_MAINTENANCE.
-      if (node.isEnteringMaintenance()) {
-        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
-          LOG.info("Starting maintenance of {} {} with {} blocks",
-              node, storage, storage.numBlocks());
-        }
-        node.getLeavingServiceStatus().setStartTime(monotonicNow());
-      }
-      // Track the node regardless whether it is ENTERING_MAINTENANCE or
-      // IN_MAINTENANCE to support maintenance expiration.
-      pendingNodes.add(node);
-    } else {
-      LOG.trace("startMaintenance: Node {} in {}, nothing to do." +
-          node, node.getAdminState());
-    }
-  }
-
-
-  /**
-   * Stop maintenance of the specified datanode.
-   * @param node
-   */
-  @VisibleForTesting
-  public void stopMaintenance(DatanodeDescriptor node) {
-    if (node.isMaintenance()) {
-      // Update DN stats maintained by HeartbeatManager
-      hbManager.stopMaintenance(node);
-
-      // extra redundancy blocks will be detected and processed when
-      // the dead node comes back and send in its full block report.
-      if (!node.isAlive()) {
-        // The node became dead when it was in maintenance, at which point
-        // the replicas weren't removed from block maps.
-        // When the node leaves maintenance, the replicas should be removed
-        // from the block maps to trigger the necessary replication to
-        // maintain the safety property of "# of live replicas + maintenance
-        // replicas" >= the expected redundancy.
-        blockManager.removeBlocksAssociatedTo(node);
-      } else {
-        // Even though putting nodes in maintenance node doesn't cause live
-        // replicas to match expected replication factor, it is still possible
-        // to have over replicated when the node leaves maintenance node.
-        // First scenario:
-        // a. Node became dead when it is at AdminStates.NORMAL, thus
-        //    block is replicated so that 3 replicas exist on other nodes.
-        // b. Admins put the dead node into maintenance mode and then
-        //    have the node rejoin the cluster.
-        // c. Take the node out of maintenance mode.
-        // Second scenario:
-        // a. With replication factor 3, set one replica to maintenance node,
-        //    thus block has 1 maintenance replica and 2 live replicas.
-        // b. Change the replication factor to 2. The block will still have
-        //    1 maintenance replica and 2 live replicas.
-        // c. Take the node out of maintenance mode.
-        blockManager.processExtraRedundancyBlocksOnInService(node);
-      }
-
-      // Remove from tracking in DecommissionManager
-      pendingNodes.remove(node);
-      outOfServiceNodeBlocks.remove(node);
-    } else {
-      LOG.trace("stopMaintenance: Node {} in {}, nothing to do." +
-          node, node.getAdminState());
-    }
-  }
-
-  private void setDecommissioned(DatanodeDescriptor dn) {
-    dn.setDecommissioned();
-    LOG.info("Decommissioning complete for node {}", dn);
-  }
-
-  private void setInMaintenance(DatanodeDescriptor dn) {
-    dn.setInMaintenance();
-    LOG.info("Node {} has entered maintenance mode.", dn);
-  }
-
-  /**
-   * Checks whether a block is sufficiently replicated/stored for
-   * decommissioning. For replicated blocks or striped blocks, full-strength
-   * replication or storage is not always necessary, hence "sufficient".
-   * @return true if sufficient, else false.
-   */
-  private boolean isSufficient(BlockInfo block, BlockCollection bc,
-      NumberReplicas numberReplicas, boolean isDecommission) {
-    if (blockManager.hasEnoughEffectiveReplicas(block, numberReplicas, 0)) {
-      // Block has enough replica, skip
-      LOG.trace("Block {} does not need replication.", block);
-      return true;
-    }
-
-    final int numExpected = blockManager.getExpectedLiveRedundancyNum(block,
-        numberReplicas);
-    final int numLive = numberReplicas.liveReplicas();
-
-    // Block is under-replicated
-    LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
-        numLive);
-    if (isDecommission && numExpected > numLive) {
-      if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
-        // Can decom a UC block as long as there will still be minReplicas
-        if (blockManager.hasMinStorage(block, numLive)) {
-          LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
-              + ">= minR ({})", block, numLive,
-              blockManager.getMinStorageNum(block));
-          return true;
-        } else {
-          LOG.trace("UC block {} insufficiently-replicated since numLive "
-              + "({}) < minR ({})", block, numLive,
-              blockManager.getMinStorageNum(block));
-        }
-      } else {
-        // Can decom a non-UC as long as the default replication is met
-        if (numLive >= blockManager.getDefaultStorageNum(block)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  private void logBlockReplicationInfo(BlockInfo block,
-      BlockCollection bc,
-      DatanodeDescriptor srcNode, NumberReplicas num,
-      Iterable<DatanodeStorageInfo> storages) {
-    if (!NameNode.blockStateChangeLog.isInfoEnabled()) {
-      return;
-    }
-
-    int curReplicas = num.liveReplicas();
-    int curExpectedRedundancy = blockManager.getExpectedRedundancyNum(block);
-    StringBuilder nodeList = new StringBuilder();
-    for (DatanodeStorageInfo storage : storages) {
-      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
-      nodeList.append(node);
-      nodeList.append(" ");
-    }
-    NameNode.blockStateChangeLog.info(
-        "Block: " + block + ", Expected Replicas: "
-        + curExpectedRedundancy + ", live replicas: " + curReplicas
-        + ", corrupt replicas: " + num.corruptReplicas()
-        + ", decommissioned replicas: " + num.decommissioned()
-        + ", decommissioning replicas: " + num.decommissioning()
-        + ", maintenance replicas: " + num.maintenanceReplicas()
-        + ", live entering maintenance replicas: "
-        + num.liveEnteringMaintenanceReplicas()
-        + ", excess replicas: " + num.excessReplicas()
-        + ", Is Open File: " + bc.isUnderConstruction()
-        + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
-        + srcNode + ", Is current datanode decommissioning: "
-        + srcNode.isDecommissionInProgress() +
-        ", Is current datanode entering maintenance: "
-        + srcNode.isEnteringMaintenance());
-  }
-
-  @VisibleForTesting
-  public int getNumPendingNodes() {
-    return pendingNodes.size();
-  }
-
-  @VisibleForTesting
-  public int getNumTrackedNodes() {
-    return outOfServiceNodeBlocks.size();
-  }
-
-  @VisibleForTesting
-  public int getNumNodesChecked() {
-    return monitor.numNodesChecked;
-  }
-
-  /**
-   * Checks to see if DNs have finished decommissioning.
-   * <p/>
-   * Since this is done while holding the namesystem lock, 
-   * the amount of work per monitor tick is limited.
-   */
-  private class Monitor implements Runnable {
-    /**
-     * The maximum number of blocks to check per tick.
-     */
-    private final int numBlocksPerCheck;
-    /**
-     * The maximum number of nodes to track in outOfServiceNodeBlocks.
-     * A value of 0 means no limit.
-     */
-    private final int maxConcurrentTrackedNodes;
-    /**
-     * The number of blocks that have been checked on this tick.
-     */
-    private int numBlocksChecked = 0;
-    /**
-     * The number of blocks checked after (re)holding lock.
-     */
-    private int numBlocksCheckedPerLock = 0;
-    /**
-     * The number of nodes that have been checked on this tick. Used for 
-     * statistics.
-     */
-    private int numNodesChecked = 0;
-    /**
-     * The last datanode in outOfServiceNodeBlocks that we've processed
-     */
-    private DatanodeDescriptor iterkey = new DatanodeDescriptor(new 
-        DatanodeID("", "", "", 0, 0, 0, 0));
-
-    Monitor(int numBlocksPerCheck, int maxConcurrentTrackedNodes) {
-      this.numBlocksPerCheck = numBlocksPerCheck;
-      this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
-    }
-
-    private boolean exceededNumBlocksPerCheck() {
-      LOG.trace("Processed {} blocks so far this tick", numBlocksChecked);
-      return numBlocksChecked >= numBlocksPerCheck;
-    }
-
-    @Override
-    public void run() {
-      if (!namesystem.isRunning()) {
-        LOG.info("Namesystem is not running, skipping decommissioning checks"
-            + ".");
-        return;
-      }
-      // Reset the checked count at beginning of each iteration
-      numBlocksChecked = 0;
-      numBlocksCheckedPerLock = 0;
-      numNodesChecked = 0;
-      // Check decommission or maintenance progress.
-      namesystem.writeLock();
-      try {
-        processPendingNodes();
-        check();
-      } finally {
-        namesystem.writeUnlock();
-      }
-      if (numBlocksChecked + numNodesChecked > 0) {
-        LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked,
-            numNodesChecked);
-      }
-    }
-
-    /**
-     * Pop datanodes off the pending list and into decomNodeBlocks, 
-     * subject to the maxConcurrentTrackedNodes limit.
-     */
-    private void processPendingNodes() {
-      while (!pendingNodes.isEmpty() &&
-          (maxConcurrentTrackedNodes == 0 ||
-          outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
-        outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
-      }
-    }
-
-    private void check() {
-      final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>>
-          it = new CyclicIteration<>(outOfServiceNodeBlocks,
-              iterkey).iterator();
-      final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
-
-      while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
-          .isRunning()) {
-        numNodesChecked++;
-        final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
-            entry = it.next();
-        final DatanodeDescriptor dn = entry.getKey();
-        AbstractList<BlockInfo> blocks = entry.getValue();
-        boolean fullScan = false;
-        if (dn.isMaintenance() && dn.maintenanceExpired()) {
-          // If maintenance expires, stop tracking it.
-          stopMaintenance(dn);
-          toRemove.add(dn);
-          continue;
-        }
-        if (dn.isInMaintenance()) {
-          // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet.
-          continue;
-        }
-        if (blocks == null) {
-          // This is a newly added datanode, run through its list to schedule 
-          // under-replicated blocks for replication and collect the blocks 
-          // that are insufficiently replicated for further tracking
-          LOG.debug("Newly-added node {}, doing full scan to find " +
-              "insufficiently-replicated blocks.", dn);
-          blocks = handleInsufficientlyStored(dn);
-          outOfServiceNodeBlocks.put(dn, blocks);
-          fullScan = true;
-        } else {
-          // This is a known datanode, check if its # of insufficiently 
-          // replicated blocks has dropped to zero and if it can be decommed
-          LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
-          pruneReliableBlocks(dn, blocks);
-        }
-        if (blocks.size() == 0) {
-          if (!fullScan) {
-            // If we didn't just do a full scan, need to re-check with the 
-            // full block map.
-            //
-            // We've replicated all the known insufficiently replicated 
-            // blocks. Re-check with the full block map before finally 
-            // marking the datanode as decommissioned 
-            LOG.debug("Node {} has finished replicating current set of "
-                + "blocks, checking with the full block map.", dn);
-            blocks = handleInsufficientlyStored(dn);
-            outOfServiceNodeBlocks.put(dn, blocks);
-          }
-          // If the full scan is clean AND the node liveness is okay, 
-          // we can finally mark as decommissioned.
-          final boolean isHealthy =
-              blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
-          if (blocks.size() == 0 && isHealthy) {
-            if (dn.isDecommissionInProgress()) {
-              setDecommissioned(dn);
-              toRemove.add(dn);
-            } else if (dn.isEnteringMaintenance()) {
-              // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
-              // to track maintenance expiration.
-              setInMaintenance(dn);
-            } else {
-              Preconditions.checkState(false,
-                  "A node is in an invalid state!");
-            }
-            LOG.debug("Node {} is sufficiently replicated and healthy, "
-                + "marked as {}.", dn.getAdminState());
-          } else {
-            LOG.debug("Node {} {} healthy."
-                + " It needs to replicate {} more blocks."
-                + " {} is still in progress.", dn,
-                isHealthy? "is": "isn't", blocks.size(), dn.getAdminState());
-          }
-        } else {
-          LOG.debug("Node {} still has {} blocks to replicate "
-              + "before it is a candidate to finish {}.",
-              dn, blocks.size(), dn.getAdminState());
-        }
-        iterkey = dn;
-      }
-      // Remove the datanodes that are decommissioned or in service after
-      // maintenance expiration.
-      for (DatanodeDescriptor dn : toRemove) {
-        Preconditions.checkState(dn.isDecommissioned() || dn.isInService(),
-            "Removing a node that is not yet decommissioned or in service!");
-        outOfServiceNodeBlocks.remove(dn);
-      }
-    }
-
-    /**
-     * Removes reliable blocks from the block list of a datanode.
-     */
-    private void pruneReliableBlocks(final DatanodeDescriptor datanode,
-        AbstractList<BlockInfo> blocks) {
-      processBlocksInternal(datanode, blocks.iterator(), null, true);
-    }
-
-    /**
-     * Returns a list of blocks on a datanode that are insufficiently replicated
-     * or require recovery, i.e. requiring recovery and should prevent
-     * decommission.
-     * <p/>
-     * As part of this, it also schedules replication/recovery work.
-     *
-     * @return List of blocks requiring recovery
-     */
-    private AbstractList<BlockInfo> handleInsufficientlyStored(
-        final DatanodeDescriptor datanode) {
-      AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>();
-      processBlocksInternal(datanode, datanode.getBlockIterator(),
-          insufficient, false);
-      return insufficient;
-    }
-
-    /**
-     * Used while checking if decommission-in-progress datanodes can be marked
-     * as decommissioned. Combines shared logic of 
-     * pruneReliableBlocks and handleInsufficientlyStored.
-     *
-     * @param datanode                    Datanode
-     * @param it                          Iterator over the blocks on the
-     *                                    datanode
-     * @param insufficientList            Return parameter. If it's not null,
-     *                                    will contain the insufficiently
-     *                                    replicated-blocks from the list.
-     * @param pruneReliableBlocks         whether to remove blocks reliable
-     *                                    enough from the iterator
-     */
-    private void processBlocksInternal(
-        final DatanodeDescriptor datanode,
-        final Iterator<BlockInfo> it,
-        final List<BlockInfo> insufficientList,
-        boolean pruneReliableBlocks) {
-      boolean firstReplicationLog = true;
-      // Low redundancy in UC Blocks only
-      int lowRedundancyInOpenFiles = 0;
-      // All low redundancy blocks. Includes lowRedundancyInOpenFiles.
-      int lowRedundancyBlocks = 0;
-      // All maintenance and decommission replicas.
-      int outOfServiceOnlyReplicas = 0;
-      while (it.hasNext()) {
-        if (insufficientList == null
-            && numBlocksCheckedPerLock >= numBlocksPerCheck) {
-          // During fullscan insufficientlyReplicated will NOT be null, iterator
-          // will be DN's iterator. So should not yield lock, otherwise
-          // ConcurrentModificationException could occur.
-          // Once the fullscan done, iterator will be a copy. So can yield the
-          // lock.
-          // Yielding is required in case of block number is greater than the
-          // configured per-iteration-limit.
-          namesystem.writeUnlock();
-          try {
-            LOG.debug("Yielded lock during decommission check");
-            Thread.sleep(0, 500);
-          } catch (InterruptedException ignored) {
-            return;
-          }
-          // reset
-          numBlocksCheckedPerLock = 0;
-          namesystem.writeLock();
-        }
-        numBlocksChecked++;
-        numBlocksCheckedPerLock++;
-        final BlockInfo block = it.next();
-        // Remove the block from the list if it's no longer in the block map,
-        // e.g. the containing file has been deleted
-        if (blockManager.blocksMap.getStoredBlock(block) == null) {
-          LOG.trace("Removing unknown block {}", block);
-          it.remove();
-          continue;
-        }
-
-        long bcId = block.getBlockCollectionId();
-        if (bcId == INodeId.INVALID_INODE_ID) {
-          // Orphan block, will be invalidated eventually. Skip.
-          continue;
-        }
-
-        final BlockCollection bc = blockManager.getBlockCollection(block);
-        final NumberReplicas num = blockManager.countNodes(block);
-        final int liveReplicas = num.liveReplicas();
-
-        // Schedule low redundancy blocks for reconstruction if not already
-        // pending
-        boolean isDecommission = datanode.isDecommissionInProgress();
-        boolean neededReconstruction = isDecommission ?
-            blockManager.isNeededReconstruction(block, num) :
-            blockManager.isNeededReconstructionForMaintenance(block, num);
-        if (neededReconstruction) {
-          if (!blockManager.neededReconstruction.contains(block) &&
-              blockManager.pendingReconstruction.getNumReplicas(block) == 0 &&
-              blockManager.isPopulatingReplQueues()) {
-            // Process these blocks only when active NN is out of safe mode.
-            blockManager.neededReconstruction.add(block,
-                liveReplicas, num.readOnlyReplicas(),
-                num.outOfServiceReplicas(),
-                blockManager.getExpectedRedundancyNum(block));
-          }
-        }
-
-        // Even if the block is without sufficient redundancy,
-        // it doesn't block decommission if has sufficient redundancy
-        if (isSufficient(block, bc, num, isDecommission)) {
-          if (pruneReliableBlocks) {
-            it.remove();
-          }
-          continue;
-        }
-
-        // We've found a block without sufficient redundancy.
-        if (insufficientList != null) {
-          insufficientList.add(block);
-        }
-        // Log if this is our first time through
-        if (firstReplicationLog) {
-          logBlockReplicationInfo(block, bc, datanode, num,
-              blockManager.blocksMap.getStorages(block));
-          firstReplicationLog = false;
-        }
-        // Update various counts
-        lowRedundancyBlocks++;
-        if (bc.isUnderConstruction()) {
-          lowRedundancyInOpenFiles++;
-        }
-        if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) {
-          outOfServiceOnlyReplicas++;
-        }
-      }
-
-      datanode.getLeavingServiceStatus().set(lowRedundancyInOpenFiles,
-          lowRedundancyBlocks, outOfServiceOnlyReplicas);
-    }
-  }
-
-  @VisibleForTesting
-  void runMonitorForTest() throws ExecutionException, InterruptedException {
-    executor.submit(monitor).get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
index b859148..318d8e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
@@ -471,7 +471,7 @@ public class BackupNode extends NameNode {
      * {@link LeaseManager.Monitor} protected by SafeMode.
      * {@link BlockManager.RedundancyMonitor} protected by SafeMode.
      * {@link HeartbeatManager.Monitor} protected by SafeMode.
-     * {@link DecommissionManager.Monitor} need to prohibit refreshNodes().
+     * {@link DatanodeAdminManager.Monitor} need to prohibit refreshNodes().
      * {@link PendingReconstructionBlocks.PendingReconstructionMonitor}
      * harmless, because RedundancyMonitor is muted.
      */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 4caee9e..8bf2b8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -960,17 +960,17 @@
 <property>
   <name>dfs.namenode.decommission.interval</name>
   <value>30s</value>
-  <description>Namenode periodicity in seconds to check if decommission is 
-  complete. Support multiple time unit suffix(case insensitive), as described
-  in dfs.heartbeat.interval.
+  <description>Namenode periodicity in seconds to check if
+    decommission or maintenance is complete. Support multiple time unit
+    suffix(case insensitive), as described in dfs.heartbeat.interval.
   </description>
 </property>
 
 <property>
   <name>dfs.namenode.decommission.blocks.per.interval</name>
   <value>500000</value>
-  <description>The approximate number of blocks to process per 
-      decommission interval, as defined in dfs.namenode.decommission.interval.
+  <description>The approximate number of blocks to process per decommission
+    or maintenance interval, as defined in dfs.namenode.decommission.interval.
   </description>
 </property>
 
@@ -978,11 +978,12 @@
   <name>dfs.namenode.decommission.max.concurrent.tracked.nodes</name>
   <value>100</value>
   <description>
-    The maximum number of decommission-in-progress datanodes nodes that will be
-    tracked at one time by the namenode. Tracking a decommission-in-progress
-    datanode consumes additional NN memory proportional to the number of blocks
-    on the datnode. Having a conservative limit reduces the potential impact
-    of decomissioning a large number of nodes at once.
+    The maximum number of decommission-in-progress or
+    entering-maintenance datanodes nodes that will be tracked at one time by
+    the namenode. Tracking these datanode consumes additional NN memory
+    proportional to the number of blocks on the datnode. Having a conservative
+    limit reduces the potential impact of decommissioning or maintenance of
+    a large number of nodes at once.
       
     A value of 0 means no limit will be enforced.
   </description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index c2c6be1..ac14a2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -256,9 +256,10 @@ public class TestDecommission extends AdminStatesBaseTest {
 
     startSimpleHACluster(3);
 
-    // Step 1, create a cluster with 4 DNs. Blocks are stored on the first 3 DNs.
-    // The last DN is empty. Also configure the last DN to have slow heartbeat
-    // so that it will be chosen as excess replica candidate during recommission.
+    // Step 1, create a cluster with 4 DNs. Blocks are stored on the
+    // first 3 DNs. The last DN is empty. Also configure the last DN to have
+    // slow heartbeat so that it will be chosen as excess replica candidate
+    // during recommission.
 
     // Step 1.a, copy blocks to the first 3 DNs. Given the replica count is the
     // same as # of DNs, each DN will have a replica for any block.
@@ -290,9 +291,9 @@ public class TestDecommission extends AdminStatesBaseTest {
 
     // Step 3, recommission the first DN on SBN and ANN to create excess replica
     // It recommissions the node on SBN first to create potential
-    // inconsistent state. In production cluster, such insistent state can happen
-    // even if recommission command was issued on ANN first given the async nature
-    // of the system.
+    // inconsistent state. In production cluster, such insistent state can
+    // happen even if recommission command was issued on ANN first given the
+    // async nature of the system.
 
     // Step 3.a, ask SBN to recomm the first DN.
     // SBN has been fixed so that it no longer invalidates excess replica during
@@ -301,10 +302,10 @@ public class TestDecommission extends AdminStatesBaseTest {
     //    1. the last DN would have been chosen as excess replica, given its
     //    heartbeat is considered old.
     //    Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete
-    //    2. After recommissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
+    //    2. After recommissionNode finishes, SBN has 3 live replicas (0, 1, 2)
     //    and one excess replica ( 3 )
     // After the fix,
-    //    After recommissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
+    //    After recommissionNode finishes, SBN has 4 live replicas (0, 1, 2, 3)
     Thread.sleep(slowHeartbeatDNwaitTime);
     putNodeInService(1, decomNodeFromSBN);
 
@@ -561,7 +562,8 @@ public class TestDecommission extends AdminStatesBaseTest {
    * federated cluster.
    */
   @Test(timeout=360000)
-  public void testHostsFileFederation() throws IOException, InterruptedException {
+  public void testHostsFileFederation()
+      throws IOException, InterruptedException {
     // Test for 3 namenode federated cluster
     testHostsFile(3);
   }
@@ -598,7 +600,8 @@ public class TestDecommission extends AdminStatesBaseTest {
   }
   
   @Test(timeout=120000)
-  public void testDecommissionWithOpenfile() throws IOException, InterruptedException {
+  public void testDecommissionWithOpenfile()
+      throws IOException, InterruptedException {
     LOG.info("Starting test testDecommissionWithOpenfile");
     
     //At most 4 nodes will be decommissioned
@@ -742,14 +745,15 @@ public class TestDecommission extends AdminStatesBaseTest {
 
     // make sure the two datanodes remain in decomm in progress state
     BlockManagerTestUtil.recheckDecommissionState(dm);
-    assertTrackedAndPending(dm.getDecomManager(), 2, 0);
+    assertTrackedAndPending(dm.getDatanodeAdminManager(), 2, 0);
   }
   
   /**
    * Tests restart of namenode while datanode hosts are added to exclude file
    **/
   @Test(timeout=360000)
-  public void testDecommissionWithNamenodeRestart()throws IOException, InterruptedException {
+  public void testDecommissionWithNamenodeRestart()
+      throws IOException, InterruptedException {
     LOG.info("Starting test testDecommissionWithNamenodeRestart");
     int numNamenodes = 1;
     int numDatanodes = 1;
@@ -914,7 +918,7 @@ public class TestDecommission extends AdminStatesBaseTest {
   
   @Test(timeout=120000)
   public void testBlocksPerInterval() throws Exception {
-    org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+    org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)
         .setLevel(Level.TRACE);
     // Turn the blocks per interval way down
     getConf().setInt(
@@ -927,7 +931,8 @@ public class TestDecommission extends AdminStatesBaseTest {
     final FileSystem fs = getCluster().getFileSystem();
     final DatanodeManager datanodeManager =
         getCluster().getNamesystem().getBlockManager().getDatanodeManager();
-    final DecommissionManager decomManager = datanodeManager.getDecomManager();
+    final DatanodeAdminManager decomManager =
+        datanodeManager.getDatanodeAdminManager();
 
     // Write a 3 block file, so each node has one block. Should scan 3 nodes.
     DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
@@ -944,7 +949,7 @@ public class TestDecommission extends AdminStatesBaseTest {
   }
 
   private void doDecomCheck(DatanodeManager datanodeManager,
-      DecommissionManager decomManager, int expectedNumCheckedNodes)
+      DatanodeAdminManager decomManager, int expectedNumCheckedNodes)
       throws IOException, ExecutionException, InterruptedException {
     // Decom all nodes
     ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
@@ -965,7 +970,7 @@ public class TestDecommission extends AdminStatesBaseTest {
 
   @Test(timeout=120000)
   public void testPendingNodes() throws Exception {
-    org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+    org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)
         .setLevel(Level.TRACE);
     // Only allow one node to be decom'd at a time
     getConf().setInt(
@@ -978,7 +983,8 @@ public class TestDecommission extends AdminStatesBaseTest {
     final FileSystem fs = getCluster().getFileSystem();
     final DatanodeManager datanodeManager =
         getCluster().getNamesystem().getBlockManager().getDatanodeManager();
-    final DecommissionManager decomManager = datanodeManager.getDecomManager();
+    final DatanodeAdminManager decomManager =
+        datanodeManager.getDatanodeAdminManager();
 
     // Keep a file open to prevent decom from progressing
     HdfsDataOutputStream open1 =
@@ -1014,7 +1020,7 @@ public class TestDecommission extends AdminStatesBaseTest {
     assertTrackedAndPending(decomManager, 1, 0);
   }
 
-  private void assertTrackedAndPending(DecommissionManager decomManager,
+  private void assertTrackedAndPending(DatanodeAdminManager decomManager,
       int tracked, int pending) {
     assertEquals("Unexpected number of tracked nodes", tracked,
         decomManager.getNumTrackedNodes());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 77e2ffb..7ee766f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -328,7 +328,7 @@ public class BlockManagerTestUtil {
    */
   public static void recheckDecommissionState(DatanodeManager dm)
       throws ExecutionException, InterruptedException {
-    dm.getDecomManager().runMonitorForTest();
+    dm.getDatanodeAdminManager().runMonitorForTest();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
index 4ecfd50..aaa4899 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
@@ -330,8 +330,9 @@ public class TestReconstructStripedBlocksWithRackAwareness {
     // start decommissioning h9
     boolean satisfied = bm.isPlacementPolicySatisfied(blockInfo);
     Assert.assertFalse(satisfied);
-    final DecommissionManager decomManager =
-        (DecommissionManager) Whitebox.getInternalState(dm, "decomManager");
+    final DatanodeAdminManager decomManager =
+        (DatanodeAdminManager) Whitebox.getInternalState(
+            dm, "datanodeAdminManager");
     cluster.getNamesystem().writeLock();
     try {
       dn9.stopDecommission();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
index bcd8245..fef0b45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
@@ -100,7 +100,7 @@ public class TestReplicationPolicyConsiderLoad
       // returns false
       for (int i = 0; i < 3; i++) {
         DatanodeDescriptor d = dataNodes[i];
-        dnManager.getDecomManager().startDecommission(d);
+        dnManager.getDatanodeAdminManager().startDecommission(d);
         d.setDecommissioned();
       }
       assertEquals((double)load/3, dnManager.getFSClusterStats()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
index 11d7431..cfebff7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
@@ -50,7 +50,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -100,7 +100,7 @@ public class TestDecommissioningStatus {
     fileSys = cluster.getFileSystem();
     cluster.getNamesystem().getBlockManager().getDatanodeManager()
         .setHeartbeatExpireInterval(3000);
-    Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
+    Logger.getLogger(DatanodeAdminManager.class).setLevel(Level.DEBUG);
     LOG = Logger.getLogger(TestDecommissioningStatus.class);
   }
 
@@ -344,7 +344,7 @@ public class TestDecommissioningStatus {
    */
   @Test(timeout=120000)
   public void testDecommissionDeadDN() throws Exception {
-    Logger log = Logger.getLogger(DecommissionManager.class);
+    Logger log = Logger.getLogger(DatanodeAdminManager.class);
     log.setLevel(Level.DEBUG);
     DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
     String dnName = dnID.getXferAddr();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
index eab1199..205593f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
@@ -156,11 +156,11 @@ public class TestDefaultBlockPlacementPolicy {
     DatanodeDescriptor dnd3 = dnm.getDatanode(
         cluster.getDataNodes().get(3).getDatanodeId());
     assertEquals(dnd3.getNetworkLocation(), clientRack);
-    dnm.getDecomManager().startDecommission(dnd3);
+    dnm.getDatanodeAdminManager().startDecommission(dnd3);
     try {
       testPlacement(clientMachine, clientRack, false);
     } finally {
-      dnm.getDecomManager().stopDecommission(dnd3);
+      dnm.getDatanodeAdminManager().stopDecommission(dnd3);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message