hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject [23/50] [abbrv] hadoop git commit: HDFS-9392. Admins support for maintenance state. Contributed by Ming Ma.
Date Tue, 06 Sep 2016 16:43:56 GMT
HDFS-9392. Admins support for maintenance state. Contributed by Ming Ma.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9dcbdbdb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9dcbdbdb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9dcbdbdb

Branch: refs/heads/HADOOP-13345
Commit: 9dcbdbdb5a34d85910707f81ebc1bb1f81c99978
Parents: c4ee691
Author: Ming Ma <mingma@apache.org>
Authored: Tue Aug 30 14:00:13 2016 -0700
Committer: Ming Ma <mingma@apache.org>
Committed: Tue Aug 30 14:00:13 2016 -0700

----------------------------------------------------------------------
 .../hdfs/protocol/DatanodeAdminProperties.java  |  19 +
 .../hadoop/hdfs/protocol/DatanodeInfo.java      |  27 +-
 .../hadoop/hdfs/protocol/HdfsConstants.java     |   2 +-
 .../CombinedHostFileManager.java                |  23 +
 .../server/blockmanagement/DatanodeManager.java |  33 +-
 .../server/blockmanagement/DatanodeStats.java   |  10 +-
 .../blockmanagement/DecommissionManager.java    | 101 +++-
 .../blockmanagement/HeartbeatManager.java       |  27 +
 .../blockmanagement/HostConfigManager.java      |   7 +
 .../server/blockmanagement/HostFileManager.java |   6 +
 .../hdfs/server/namenode/FSNamesystem.java      |  29 +
 .../namenode/metrics/FSNamesystemMBean.java     |  15 +
 .../apache/hadoop/hdfs/AdminStatesBaseTest.java | 375 ++++++++++++
 .../apache/hadoop/hdfs/TestDecommission.java    | 592 ++++++-------------
 .../hadoop/hdfs/TestMaintenanceState.java       | 310 ++++++++++
 .../namenode/TestDecommissioningStatus.java     |   2 +-
 .../hadoop/hdfs/util/HostsFileWriter.java       |  55 +-
 .../hdfs/util/TestCombinedHostsFileReader.java  |   2 +-
 .../src/test/resources/dfs.hosts.json           |   2 +
 19 files changed, 1165 insertions(+), 472 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java
index 9f7b983..2abed81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java
@@ -33,6 +33,7 @@ public class DatanodeAdminProperties {
   private int port;
   private String upgradeDomain;
   private AdminStates adminState = AdminStates.NORMAL;
+  private long maintenanceExpireTimeInMS = Long.MAX_VALUE;
 
   /**
    * Return the host name of the datanode.
@@ -97,4 +98,22 @@ public class DatanodeAdminProperties {
   public void setAdminState(final AdminStates adminState) {
     this.adminState = adminState;
   }
+
+  /**
+   * Get the maintenance expiration time in milliseconds.
+   * @return the maintenance expiration time in milliseconds.
+   */
+  public long getMaintenanceExpireTimeInMS() {
+    return this.maintenanceExpireTimeInMS;
+  }
+
+  /**
+   * Get the maintenance expiration time in milliseconds.
+   * @param maintenanceExpireTimeInMS
+   *        the maintenance expiration time in milliseconds.
+   */
+  public void setMaintenanceExpireTimeInMS(
+      final long maintenanceExpireTimeInMS) {
+    this.maintenanceExpireTimeInMS = maintenanceExpireTimeInMS;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
index e04abdd..cd32a53 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
@@ -83,6 +83,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
   }
 
   protected AdminStates adminState;
+  private long maintenanceExpireTimeInMS;
 
   public DatanodeInfo(DatanodeInfo from) {
     super(from);
@@ -499,17 +500,28 @@ public class DatanodeInfo extends DatanodeID implements Node {
   }
 
   /**
-   * Put a node to maintenance mode.
+   * Start the maintenance operation.
    */
   public void startMaintenance() {
-    adminState = AdminStates.ENTERING_MAINTENANCE;
+    this.adminState = AdminStates.ENTERING_MAINTENANCE;
   }
 
   /**
-   * Put a node to maintenance mode.
+   * Put a node directly to maintenance mode.
    */
   public void setInMaintenance() {
-    adminState = AdminStates.IN_MAINTENANCE;
+    this.adminState = AdminStates.IN_MAINTENANCE;
+  }
+
+  /**
+  * @param maintenanceExpireTimeInMS the time that the DataNode is in the
+  *        maintenance mode until in the unit of milliseconds.   */
+  public void setMaintenanceExpireTimeInMS(long maintenanceExpireTimeInMS) {
+    this.maintenanceExpireTimeInMS = maintenanceExpireTimeInMS;
+  }
+
+  public long getMaintenanceExpireTimeInMS() {
+    return this.maintenanceExpireTimeInMS;
   }
 
   /**
@@ -519,6 +531,9 @@ public class DatanodeInfo extends DatanodeID implements Node {
     adminState = null;
   }
 
+  public static boolean maintenanceNotExpired(long maintenanceExpireTimeInMS) {
+    return Time.monotonicNow() < maintenanceExpireTimeInMS;
+  }
   /**
    * Returns true if the node is is entering_maintenance
    */
@@ -541,6 +556,10 @@ public class DatanodeInfo extends DatanodeID implements Node {
         adminState == AdminStates.IN_MAINTENANCE);
   }
 
+  public boolean maintenanceExpired() {
+    return !maintenanceNotExpired(this.maintenanceExpireTimeInMS);
+  }
+
   public boolean isInService() {
     return getAdminState() == AdminStates.NORMAL;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index 8df2d54..acbc8f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -141,7 +141,7 @@ public final class HdfsConstants {
 
   // type of the datanode report
   public enum DatanodeReportType {
-    ALL, LIVE, DEAD, DECOMMISSIONING
+    ALL, LIVE, DEAD, DECOMMISSIONING, ENTERING_MAINTENANCE
   }
 
   public static final byte RS_6_3_POLICY_ID = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
index 3e913b9..6f9c35e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
@@ -148,6 +148,24 @@ public class CombinedHostFileManager extends HostConfigManager {
       };
     }
 
+    synchronized long getMaintenanceExpireTimeInMS(
+        final InetSocketAddress address) {
+      Iterable<DatanodeAdminProperties> datanode = Iterables.filter(
+          allDNs.get(address.getAddress()),
+          new Predicate<DatanodeAdminProperties>() {
+            public boolean apply(DatanodeAdminProperties input) {
+              return input.getAdminState().equals(
+                  AdminStates.IN_MAINTENANCE) &&
+                  (input.getPort() == 0 ||
+                  input.getPort() == address.getPort());
+            }
+          });
+      // if DN isn't set to maintenance state, ignore MaintenanceExpireTimeInMS
+      // set in the config.
+      return datanode.iterator().hasNext() ?
+          datanode.iterator().next().getMaintenanceExpireTimeInMS() : 0;
+    }
+
     static class HostIterator extends UnmodifiableIterator<InetSocketAddress> {
       private final Iterator<Map.Entry<InetAddress,
           DatanodeAdminProperties>> it;
@@ -236,6 +254,11 @@ public class CombinedHostFileManager extends HostConfigManager {
     return hostProperties.getUpgradeDomain(dn.getResolvedAddress());
   }
 
+  @Override
+  public long getMaintenanceExpirationTimeInMS(DatanodeID dn) {
+    return hostProperties.getMaintenanceExpireTimeInMS(dn.getResolvedAddress());
+  }
+
   /**
    * Set the properties lists by the new instances. The
    * old instance is discarded.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index da02a90..fffe29c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -552,7 +552,7 @@ public class DatanodeManager {
 
 
   /** Get a datanode descriptor given corresponding DatanodeUUID */
-  DatanodeDescriptor getDatanode(final String datanodeUuid) {
+  public DatanodeDescriptor getDatanode(final String datanodeUuid) {
     if (datanodeUuid == null) {
       return null;
     }
@@ -902,10 +902,14 @@ public class DatanodeManager {
    *
    * @param nodeReg datanode
    */
-  void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) {
+  void startAdminOperationIfNecessary(DatanodeDescriptor nodeReg) {
+    long maintenanceExpireTimeInMS =
+        hostConfigManager.getMaintenanceExpirationTimeInMS(nodeReg);
     // If the registered node is in exclude list, then decommission it
     if (getHostConfigManager().isExcluded(nodeReg)) {
       decomManager.startDecommission(nodeReg);
+    } else if (nodeReg.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
+      decomManager.startMaintenance(nodeReg, maintenanceExpireTimeInMS);
     }
   }
 
@@ -1017,7 +1021,7 @@ public class DatanodeManager {
           // also treat the registration message as a heartbeat
           heartbeatManager.register(nodeS);
           incrementVersionCount(nodeS.getSoftwareVersion());
-          startDecommissioningIfExcluded(nodeS);
+          startAdminOperationIfNecessary(nodeS);
           success = true;
         } finally {
           if (!success) {
@@ -1056,7 +1060,7 @@ public class DatanodeManager {
         heartbeatManager.addDatanode(nodeDescr);
         heartbeatManager.updateDnStat(nodeDescr);
         incrementVersionCount(nodeReg.getSoftwareVersion());
-        startDecommissioningIfExcluded(nodeDescr);
+        startAdminOperationIfNecessary(nodeDescr);
         success = true;
       } finally {
         if (!success) {
@@ -1122,9 +1126,14 @@ public class DatanodeManager {
       if (!hostConfigManager.isIncluded(node)) {
         node.setDisallowed(true); // case 2.
       } else {
-        if (hostConfigManager.isExcluded(node)) {
+        long maintenanceExpireTimeInMS =
+            hostConfigManager.getMaintenanceExpirationTimeInMS(node);
+        if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
+          decomManager.startMaintenance(node, maintenanceExpireTimeInMS);
+        } else if (hostConfigManager.isExcluded(node)) {
           decomManager.startDecommission(node); // case 3.
         } else {
+          decomManager.stopMaintenance(node);
           decomManager.stopDecommission(node); // case 4.
         }
       }
@@ -1157,7 +1166,12 @@ public class DatanodeManager {
     // A decommissioning DN may be "alive" or "dead".
     return getDatanodeListForReport(DatanodeReportType.DECOMMISSIONING);
   }
-  
+
+  /** @return list of datanodes that are entering maintenance. */
+  public List<DatanodeDescriptor> getEnteringMaintenanceNodes() {
+    return getDatanodeListForReport(DatanodeReportType.ENTERING_MAINTENANCE);
+  }
+
   /* Getter and Setter for stale DataNodes related attributes */
 
   /**
@@ -1342,6 +1356,9 @@ public class DatanodeManager {
     final boolean listDecommissioningNodes =
         type == DatanodeReportType.ALL ||
         type == DatanodeReportType.DECOMMISSIONING;
+    final boolean listEnteringMaintenanceNodes =
+        type == DatanodeReportType.ALL ||
+        type == DatanodeReportType.ENTERING_MAINTENANCE;
 
     ArrayList<DatanodeDescriptor> nodes;
     final HostSet foundNodes = new HostSet();
@@ -1353,10 +1370,12 @@ public class DatanodeManager {
       for (DatanodeDescriptor dn : datanodeMap.values()) {
         final boolean isDead = isDatanodeDead(dn);
         final boolean isDecommissioning = dn.isDecommissionInProgress();
+        final boolean isEnteringMaintenance = dn.isEnteringMaintenance();
 
         if (((listLiveNodes && !isDead) ||
             (listDeadNodes && isDead) ||
-            (listDecommissioningNodes && isDecommissioning)) &&
+            (listDecommissioningNodes && isDecommissioning) ||
+            (listEnteringMaintenanceNodes && isEnteringMaintenance)) &&
             hostConfigManager.isIncluded(dn)) {
           nodes.add(dn);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java
index bcc9bba..0d4e235 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java
@@ -47,7 +47,7 @@ class DatanodeStats {
 
   synchronized void add(final DatanodeDescriptor node) {
     xceiverCount += node.getXceiverCount();
-    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+    if (node.isInService()) {
       capacityUsed += node.getDfsUsed();
       blockPoolUsed += node.getBlockPoolUsed();
       nodesInService++;
@@ -56,7 +56,8 @@ class DatanodeStats {
       capacityRemaining += node.getRemaining();
       cacheCapacity += node.getCacheCapacity();
       cacheUsed += node.getCacheUsed();
-    } else if (!node.isDecommissioned()) {
+    } else if (node.isDecommissionInProgress() ||
+        node.isEnteringMaintenance()) {
       cacheCapacity += node.getCacheCapacity();
       cacheUsed += node.getCacheUsed();
     }
@@ -74,7 +75,7 @@ class DatanodeStats {
 
   synchronized void subtract(final DatanodeDescriptor node) {
     xceiverCount -= node.getXceiverCount();
-    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+    if (node.isInService()) {
       capacityUsed -= node.getDfsUsed();
       blockPoolUsed -= node.getBlockPoolUsed();
       nodesInService--;
@@ -83,7 +84,8 @@ class DatanodeStats {
       capacityRemaining -= node.getRemaining();
       cacheCapacity -= node.getCacheCapacity();
       cacheUsed -= node.getCacheUsed();
-    } else if (!node.isDecommissioned()) {
+    } else if (node.isDecommissionInProgress() ||
+        node.isEnteringMaintenance()) {
       cacheCapacity -= node.getCacheCapacity();
       cacheUsed -= node.getCacheUsed();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index ec6d9ba..c456aba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -86,8 +86,11 @@ public class DecommissionManager {
   private final ScheduledExecutorService executor;
 
   /**
-   * Map containing the decommission-in-progress datanodes that are being
-   * tracked so they can be be marked as decommissioned.
+   * Map containing the DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE
+   * datanodes that are being tracked so they can be be marked as
+   * DECOMMISSIONED or IN_MAINTENANCE. Even after the node is marked as
+   * IN_MAINTENANCE, the node remains in the map until
+   * maintenance expires checked during a monitor tick.
    * <p/>
    * This holds a set of references to the under-replicated blocks on the DN at
    * the time the DN is added to the map, i.e. the blocks that are preventing
@@ -102,12 +105,12 @@ public class DecommissionManager {
    * another check is done with the actual block map.
    */
   private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfo>>
-      decomNodeBlocks;
+      outOfServiceNodeBlocks;
 
   /**
-   * Tracking a node in decomNodeBlocks consumes additional memory. To limit
-   * the impact on NN memory consumption, we limit the number of nodes in 
-   * decomNodeBlocks. Additional nodes wait in pendingNodes.
+   * Tracking a node in outOfServiceNodeBlocks consumes additional memory. To
+   * limit the impact on NN memory consumption, we limit the number of nodes in
+   * outOfServiceNodeBlocks. Additional nodes wait in pendingNodes.
    */
   private final Queue<DatanodeDescriptor> pendingNodes;
 
@@ -122,7 +125,7 @@ public class DecommissionManager {
     executor = Executors.newScheduledThreadPool(1,
         new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
             .setDaemon(true).build());
-    decomNodeBlocks = new TreeMap<>();
+    outOfServiceNodeBlocks = new TreeMap<>();
     pendingNodes = new LinkedList<>();
   }
 
@@ -222,13 +225,56 @@ public class DecommissionManager {
       }
       // Remove from tracking in DecommissionManager
       pendingNodes.remove(node);
-      decomNodeBlocks.remove(node);
+      outOfServiceNodeBlocks.remove(node);
     } else {
       LOG.trace("stopDecommission: Node {} in {}, nothing to do." +
           node, node.getAdminState());
     }
   }
 
+  /**
+   * Start maintenance of the specified datanode.
+   * @param node
+   */
+  @VisibleForTesting
+  public void startMaintenance(DatanodeDescriptor node,
+      long maintenanceExpireTimeInMS) {
+    // Even if the node is already in maintenance, we still need to adjust
+    // the expiration time.
+    node.setMaintenanceExpireTimeInMS(maintenanceExpireTimeInMS);
+    if (!node.isMaintenance()) {
+      // Update DN stats maintained by HeartbeatManager
+      hbManager.startMaintenance(node);
+      pendingNodes.add(node);
+    } else {
+      LOG.trace("startMaintenance: Node {} in {}, nothing to do." +
+          node, node.getAdminState());
+    }
+  }
+
+
+  /**
+   * Stop maintenance of the specified datanode.
+   * @param node
+   */
+  @VisibleForTesting
+  public void stopMaintenance(DatanodeDescriptor node) {
+    if (node.isMaintenance()) {
+      // Update DN stats maintained by HeartbeatManager
+      hbManager.stopMaintenance(node);
+
+      // TODO HDFS-9390 remove replicas from block maps
+      // or handle over replicated blocks.
+
+      // Remove from tracking in DecommissionManager
+      pendingNodes.remove(node);
+      outOfServiceNodeBlocks.remove(node);
+    } else {
+      LOG.trace("stopMaintenance: Node {} in {}, nothing to do." +
+          node, node.getAdminState());
+    }
+  }
+
   private void setDecommissioned(DatanodeDescriptor dn) {
     dn.setDecommissioned();
     LOG.info("Decommissioning complete for node {}", dn);
@@ -313,7 +359,7 @@ public class DecommissionManager {
 
   @VisibleForTesting
   public int getNumTrackedNodes() {
-    return decomNodeBlocks.size();
+    return outOfServiceNodeBlocks.size();
   }
 
   @VisibleForTesting
@@ -333,8 +379,8 @@ public class DecommissionManager {
      */
     private final int numBlocksPerCheck;
     /**
-     * The maximum number of nodes to track in decomNodeBlocks. A value of 0
-     * means no limit.
+     * The maximum number of nodes to track in outOfServiceNodeBlocks.
+     * A value of 0 means no limit.
      */
     private final int maxConcurrentTrackedNodes;
     /**
@@ -347,7 +393,7 @@ public class DecommissionManager {
      */
     private int numNodesChecked = 0;
     /**
-     * The last datanode in decomNodeBlocks that we've processed
+     * The last datanode in outOfServiceNodeBlocks that we've processed
      */
     private DatanodeDescriptor iterkey = new DatanodeDescriptor(new 
         DatanodeID("", "", "", 0, 0, 0, 0));
@@ -393,14 +439,15 @@ public class DecommissionManager {
     private void processPendingNodes() {
       while (!pendingNodes.isEmpty() &&
           (maxConcurrentTrackedNodes == 0 ||
-           decomNodeBlocks.size() < maxConcurrentTrackedNodes)) {
-        decomNodeBlocks.put(pendingNodes.poll(), null);
+          outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
+        outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
       }
     }
 
     private void check() {
       final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>>
-          it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator();
+          it = new CyclicIteration<>(outOfServiceNodeBlocks,
+              iterkey).iterator();
       final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
 
       while (it.hasNext() && !exceededNumBlocksPerCheck()) {
@@ -410,6 +457,17 @@ public class DecommissionManager {
         final DatanodeDescriptor dn = entry.getKey();
         AbstractList<BlockInfo> blocks = entry.getValue();
         boolean fullScan = false;
+        if (dn.isMaintenance()) {
+          // TODO HDFS-9390 make sure blocks are minimally replicated
+          // before transitioning the node to IN_MAINTENANCE state.
+
+          // If maintenance expires, stop tracking it.
+          if (dn.maintenanceExpired()) {
+            stopMaintenance(dn);
+            toRemove.add(dn);
+          }
+          continue;
+        }
         if (blocks == null) {
           // This is a newly added datanode, run through its list to schedule 
           // under-replicated blocks for replication and collect the blocks 
@@ -417,7 +475,7 @@ public class DecommissionManager {
           LOG.debug("Newly-added node {}, doing full scan to find " +
               "insufficiently-replicated blocks.", dn);
           blocks = handleInsufficientlyStored(dn);
-          decomNodeBlocks.put(dn, blocks);
+          outOfServiceNodeBlocks.put(dn, blocks);
           fullScan = true;
         } else {
           // This is a known datanode, check if its # of insufficiently 
@@ -436,7 +494,7 @@ public class DecommissionManager {
             LOG.debug("Node {} has finished replicating current set of "
                 + "blocks, checking with the full block map.", dn);
             blocks = handleInsufficientlyStored(dn);
-            decomNodeBlocks.put(dn, blocks);
+            outOfServiceNodeBlocks.put(dn, blocks);
           }
           // If the full scan is clean AND the node liveness is okay, 
           // we can finally mark as decommissioned.
@@ -460,11 +518,12 @@ public class DecommissionManager {
         }
         iterkey = dn;
       }
-      // Remove the datanodes that are decommissioned
+      // Remove the datanodes that are decommissioned or in service after
+      // maintenance expiration.
       for (DatanodeDescriptor dn : toRemove) {
-        Preconditions.checkState(dn.isDecommissioned(),
-            "Removing a node that is not yet decommissioned!");
-        decomNodeBlocks.remove(dn);
+        Preconditions.checkState(dn.isDecommissioned() || dn.isInService(),
+            "Removing a node that is not yet decommissioned or in service!");
+        outOfServiceNodeBlocks.remove(dn);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index cec4a1a..d728ee2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -265,6 +265,33 @@ class HeartbeatManager implements DatanodeStatistics {
     }
   }
 
+  synchronized void startMaintenance(final DatanodeDescriptor node) {
+    if (!node.isAlive()) {
+      LOG.info("Dead node {} is put in maintenance state immediately.", node);
+      node.setInMaintenance();
+    } else if (node.isDecommissioned()) {
+      LOG.info("Decommissioned node " + node + " is put in maintenance state"
+          + " immediately.");
+      node.setInMaintenance();
+    } else {
+      stats.subtract(node);
+      node.startMaintenance();
+      stats.add(node);
+    }
+  }
+
+  synchronized void stopMaintenance(final DatanodeDescriptor node) {
+    LOG.info("Stopping maintenance of {} node {}",
+        node.isAlive() ? "live" : "dead", node);
+    if (!node.isAlive()) {
+      node.stopMaintenance();
+    } else {
+      stats.subtract(node);
+      node.stopMaintenance();
+      stats.add(node);
+    }
+  }
+
   synchronized void stopDecommission(final DatanodeDescriptor node) {
     LOG.info("Stopping decommissioning of {} node {}",
         node.isAlive() ? "live" : "dead", node);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java
index f28ed29..0ab4ebc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java
@@ -77,4 +77,11 @@ public abstract class HostConfigManager implements Configurable {
    * @return the upgrade domain of dn.
    */
   public abstract String getUpgradeDomain(DatanodeID dn);
+
+  /**
+   * Get the maintenance expiration time in milli seconds.
+   * @param dn the DatanodeID of the datanode
+   * @return the maintenance expiration time of dn.
+   */
+  public abstract long getMaintenanceExpirationTimeInMS(DatanodeID dn);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
index bcfebf2..59f907f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
@@ -138,6 +138,12 @@ public class HostFileManager extends HostConfigManager {
     return null;
   }
 
+  @Override
+  public long getMaintenanceExpirationTimeInMS(DatanodeID dn) {
+    // The include/exclude files based config doesn't support maintenance mode.
+    return 0;
+  }
+
   /**
    * Read the includes and excludes lists from the named files.  Any previous
    * includes and excludes lists are discarded.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 52fbaa7..f4b742e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -7079,5 +7079,34 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return blockManager.getBytesInFuture();
   }
 
+
+  @Override // FSNamesystemMBean
+  public int getNumInMaintenanceLiveDataNodes() {
+    final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+    getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true);
+    int liveInMaintenance = 0;
+    for (DatanodeDescriptor node : live) {
+      liveInMaintenance += node.isInMaintenance() ? 1 : 0;
+    }
+    return liveInMaintenance;
+  }
+
+  @Override // FSNamesystemMBean
+  public int getNumInMaintenanceDeadDataNodes() {
+    final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+    getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, true);
+    int deadInMaintenance = 0;
+    for (DatanodeDescriptor node : dead) {
+      deadInMaintenance += node.isInMaintenance() ? 1 : 0;
+    }
+    return deadInMaintenance;
+  }
+
+  @Override // FSNamesystemMBean
+  public int getNumEnteringMaintenanceDataNodes() {
+    return getBlockManager().getDatanodeManager().getEnteringMaintenanceNodes()
+        .size();
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
index b314f7f..f1e7515 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
@@ -208,4 +208,19 @@ public interface FSNamesystemMBean {
    * Return total time spent doing sync operations on FSEditLog.
    */
   String getTotalSyncTimes();
+
+  /**
+   * @return Number of IN_MAINTENANCE live data nodes
+   */
+  int getNumInMaintenanceLiveDataNodes();
+
+  /**
+   * @return Number of IN_MAINTENANCE dead data nodes
+   */
+  int getNumInMaintenanceDeadDataNodes();
+
+  /**
+   * @return Number of ENTERING_MAINTENANCE data nodes
+   */
+  int getNumEnteringMaintenanceDataNodes();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
new file mode 100644
index 0000000..0698628
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * This class provide utilities for testing of the admin operations of nodes.
+ */
+public class AdminStatesBaseTest {
+  public static final Log LOG = LogFactory.getLog(AdminStatesBaseTest.class);
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 8192;
+  static final int fileSize = 16384;
+  static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
+  static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec
+  static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval
+
+  final private Random myrand = new Random();
+
+  private HostsFileWriter hostsFileWriter;
+  private Configuration conf;
+  private MiniDFSCluster cluster = null;
+  private boolean useCombinedHostFileManager = false;
+
+  protected void setUseCombinedHostFileManager() {
+    useCombinedHostFileManager = true;
+  }
+
+  protected Configuration getConf() {
+    return conf;
+  }
+
+  protected MiniDFSCluster getCluster() {
+    return cluster;
+  }
+
+  @Before
+  public void setup() throws IOException {
+    // Set up the hosts/exclude files.
+    hostsFileWriter = new HostsFileWriter();
+    conf = new HdfsConfiguration();
+
+    if (useCombinedHostFileManager) {
+      conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
+          CombinedHostFileManager.class, HostConfigManager.class);
+    }
+
+    // Setup conf
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+        false);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+        200);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
+        BLOCKREPORT_INTERVAL_MSEC);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
+        NAMENODE_REPLICATION_INTERVAL);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
+
+    hostsFileWriter.initialize(conf, "temp/admin");
+  }
+
+  @After
+  public void teardown() throws IOException {
+    hostsFileWriter.cleanup();
+    shutdownCluster();
+  }
+
+  protected void writeFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    writeFile(fileSys, name, repl, 2);
+  }
+
+  protected void writeFile(FileSystem fileSys, Path name, int repl,
+      int numOfBlocks) throws IOException {
+    writeFile(fileSys, name, repl, numOfBlocks, true);
+  }
+
+  protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+      int repl, int numOfBlocks, boolean completeFile)
+    throws IOException {
+    // create and write a file that contains two blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+        .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        (short) repl, blockSize);
+    byte[] buffer = new byte[blockSize*numOfBlocks];
+    Random rand = new Random(seed);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    LOG.info("Created file " + name + " with " + repl + " replicas.");
+    if (completeFile) {
+      stm.close();
+      return null;
+    } else {
+      // Do not close stream, return it
+      // so that it is not garbage collected
+      return stm;
+    }
+  }
+
+  /*
+   * decommission the DN or put the DN into maintenance for datanodeUuid or one
+   * random node if datanodeUuid is null.
+   * And wait for the node to reach the given {@code waitForState}.
+   */
+  protected DatanodeInfo takeNodeOutofService(int nnIndex,
+      String datanodeUuid, long maintenanceExpirationInMS,
+      ArrayList<DatanodeInfo> decommissionedNodes,
+      AdminStates waitForState) throws IOException {
+    return takeNodeOutofService(nnIndex, datanodeUuid,
+        maintenanceExpirationInMS, decommissionedNodes, null, waitForState);
+  }
+
+  /*
+   * decommission the DN or put the DN to maintenance set by datanodeUuid
+   * Pick randome node if datanodeUuid == null
+   * wait for the node to reach the given {@code waitForState}.
+   */
+  protected DatanodeInfo takeNodeOutofService(int nnIndex,
+      String datanodeUuid, long maintenanceExpirationInMS,
+      List<DatanodeInfo> decommissionedNodes,
+      Map<DatanodeInfo, Long> inMaintenanceNodes, AdminStates waitForState)
+      throws IOException {
+    DFSClient client = getDfsClient(nnIndex);
+    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL);
+    boolean isDecommissionRequest =
+        waitForState == AdminStates.DECOMMISSION_INPROGRESS ||
+        waitForState == AdminStates.DECOMMISSIONED;
+
+    //
+    // pick one datanode randomly unless the caller specifies one.
+    //
+    int index = 0;
+    if (datanodeUuid == null) {
+      boolean found = false;
+      while (!found) {
+        index = myrand.nextInt(info.length);
+        if ((isDecommissionRequest && !info[index].isDecommissioned()) ||
+            (!isDecommissionRequest && !info[index].isInMaintenance())) {
+          found = true;
+        }
+      }
+    } else {
+      // The caller specifies a DN
+      for (; index < info.length; index++) {
+        if (info[index].getDatanodeUuid().equals(datanodeUuid)) {
+          break;
+        }
+      }
+      if (index == info.length) {
+        throw new IOException("invalid datanodeUuid " + datanodeUuid);
+      }
+    }
+    String nodename = info[index].getXferAddr();
+    LOG.info("Taking node: " + nodename + " out of service");
+
+    ArrayList<String> decommissionNodes = new ArrayList<String>();
+    if (decommissionedNodes != null) {
+      for (DatanodeInfo dn : decommissionedNodes) {
+        decommissionNodes.add(dn.getName());
+      }
+    }
+    Map<String, Long> maintenanceNodes = new HashMap<>();
+    if (inMaintenanceNodes != null) {
+      for (Map.Entry<DatanodeInfo, Long> dn :
+          inMaintenanceNodes.entrySet()) {
+        maintenanceNodes.put(dn.getKey().getName(), dn.getValue());
+      }
+    }
+
+    if (isDecommissionRequest) {
+      decommissionNodes.add(nodename);
+    } else {
+      maintenanceNodes.put(nodename, maintenanceExpirationInMS);
+    }
+
+    // write node names into the json host file.
+    hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes);
+    refreshNodes(nnIndex);
+    DatanodeInfo ret = NameNodeAdapter.getDatanode(
+        cluster.getNamesystem(nnIndex), info[index]);
+    waitNodeState(ret, waitForState);
+    return ret;
+  }
+
+  /* Ask a specific NN to put the datanode in service and wait for it
+   * to reach the NORMAL state.
+   */
+  protected void putNodeInService(int nnIndex,
+      DatanodeInfo outOfServiceNode) throws IOException {
+    LOG.info("Putting node: " + outOfServiceNode + " in service");
+    ArrayList<String> decommissionNodes = new ArrayList<>();
+    Map<String, Long> maintenanceNodes = new HashMap<>();
+
+    DatanodeManager dm =
+        cluster.getNamesystem(nnIndex).getBlockManager().getDatanodeManager();
+    List<DatanodeDescriptor> nodes =
+        dm.getDatanodeListForReport(DatanodeReportType.ALL);
+    for (DatanodeDescriptor node : nodes) {
+      if (node.isMaintenance()) {
+        maintenanceNodes.put(node.getName(),
+            node.getMaintenanceExpireTimeInMS());
+      } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+        decommissionNodes.add(node.getName());
+      }
+    }
+    decommissionNodes.remove(outOfServiceNode.getName());
+    maintenanceNodes.remove(outOfServiceNode.getName());
+
+    hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes);
+    refreshNodes(nnIndex);
+    waitNodeState(outOfServiceNode, AdminStates.NORMAL);
+  }
+
+  protected void putNodeInService(int nnIndex,
+      String datanodeUuid) throws IOException {
+    DatanodeInfo datanodeInfo =
+        getDatanodeDesriptor(cluster.getNamesystem(nnIndex), datanodeUuid);
+    putNodeInService(nnIndex, datanodeInfo);
+  }
+
+  /*
+   * Wait till node is transitioned to the expected state.
+   */
+  protected void waitNodeState(DatanodeInfo node,
+      AdminStates state) {
+    boolean done = state == node.getAdminState();
+    while (!done) {
+      LOG.info("Waiting for node " + node + " to change state to "
+          + state + " current state: " + node.getAdminState());
+      try {
+        Thread.sleep(HEARTBEAT_INTERVAL * 500);
+      } catch (InterruptedException e) {
+        // nothing
+      }
+      done = state == node.getAdminState();
+    }
+    LOG.info("node " + node + " reached the state " + state);
+  }
+
+  protected void initIncludeHost(String hostNameAndPort) throws IOException {
+    hostsFileWriter.initIncludeHost(hostNameAndPort);
+  }
+
+  protected void initIncludeHosts(String[] hostNameAndPorts)
+      throws IOException {
+    hostsFileWriter.initIncludeHosts(hostNameAndPorts);
+  }
+
+  protected void initExcludeHost(String hostNameAndPort) throws IOException {
+    hostsFileWriter.initExcludeHost(hostNameAndPort);
+  }
+
+  protected void initExcludeHosts(List<String> hostNameAndPorts)
+      throws IOException {
+    hostsFileWriter.initExcludeHosts(hostNameAndPorts);
+  }
+
+  /* Get DFSClient to the namenode */
+  protected DFSClient getDfsClient(final int nnIndex) throws IOException {
+    return new DFSClient(cluster.getNameNode(nnIndex).getNameNodeAddress(),
+        conf);
+  }
+
+  /* Validate cluster has expected number of datanodes */
+  protected static void validateCluster(DFSClient client, int numDNs)
+      throws IOException {
+    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+    assertEquals("Number of Datanodes ", numDNs, info.length);
+  }
+
+  /** Start a MiniDFSCluster.
+   * @throws IOException */
+  protected void startCluster(int numNameNodes, int numDatanodes,
+      boolean setupHostsFile, long[] nodesCapacity,
+      boolean checkDataNodeHostConfig) throws IOException {
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes))
+        .numDataNodes(numDatanodes);
+    if (setupHostsFile) {
+      builder.setupHostsFile(setupHostsFile);
+    }
+    if (nodesCapacity != null) {
+      builder.simulatedCapacities(nodesCapacity);
+    }
+    if (checkDataNodeHostConfig) {
+      builder.checkDataNodeHostConfig(checkDataNodeHostConfig);
+    }
+    cluster = builder.build();
+    cluster.waitActive();
+    for (int i = 0; i < numNameNodes; i++) {
+      DFSClient client = getDfsClient(i);
+      validateCluster(client, numDatanodes);
+    }
+  }
+
+  protected void startCluster(int numNameNodes, int numDatanodes)
+      throws IOException {
+    startCluster(numNameNodes, numDatanodes, false, null, false);
+  }
+
+  protected void startSimpleHACluster(int numDatanodes) throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(
+        numDatanodes).build();
+    cluster.transitionToActive(0);
+    cluster.waitActive();
+  }
+
+  protected void shutdownCluster() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  protected void refreshNodes(final int nnIndex) throws IOException {
+    cluster.getNamesystem(nnIndex).getBlockManager().getDatanodeManager().
+        refreshNodes(conf);
+  }
+
+  protected DatanodeDescriptor getDatanodeDesriptor(
+      final FSNamesystem ns, final String datanodeUuid) {
+    return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
+  }
+
+  protected void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+    assertTrue(fileSys.exists(name));
+    fileSys.delete(name, true);
+    assertTrue(!fileSys.exists(name));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message