Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CC5F3200B89 for ; Tue, 6 Sep 2016 10:47:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CAD02160ACE; Tue, 6 Sep 2016 08:47:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 27ECE160ABF for ; Tue, 6 Sep 2016 10:47:49 +0200 (CEST) Received: (qmail 70422 invoked by uid 500); 6 Sep 2016 08:47:40 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 69253 invoked by uid 99); 6 Sep 2016 08:47:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Sep 2016 08:47:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BAD29E0551; Tue, 6 Sep 2016 08:47:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: drankye@apache.org To: common-commits@hadoop.apache.org Date: Tue, 06 Sep 2016 08:48:05 -0000 Message-Id: <3dc5d829805040f998ae90014eeb4b3a@git.apache.org> In-Reply-To: <92603b70c6cf460c851cf6315b4a4f9d@git.apache.org> References: <92603b70c6cf460c851cf6315b4a4f9d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/50] [abbrv] hadoop git commit: HDFS-9392. Admins support for maintenance state. Contributed by Ming Ma. archived-at: Tue, 06 Sep 2016 08:47:51 -0000 HDFS-9392. Admins support for maintenance state. Contributed by Ming Ma. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9dcbdbdb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9dcbdbdb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9dcbdbdb Branch: refs/heads/HADOOP-12756 Commit: 9dcbdbdb5a34d85910707f81ebc1bb1f81c99978 Parents: c4ee691 Author: Ming Ma Authored: Tue Aug 30 14:00:13 2016 -0700 Committer: Ming Ma Committed: Tue Aug 30 14:00:13 2016 -0700 ---------------------------------------------------------------------- .../hdfs/protocol/DatanodeAdminProperties.java | 19 + .../hadoop/hdfs/protocol/DatanodeInfo.java | 27 +- .../hadoop/hdfs/protocol/HdfsConstants.java | 2 +- .../CombinedHostFileManager.java | 23 + .../server/blockmanagement/DatanodeManager.java | 33 +- .../server/blockmanagement/DatanodeStats.java | 10 +- .../blockmanagement/DecommissionManager.java | 101 +++- .../blockmanagement/HeartbeatManager.java | 27 + .../blockmanagement/HostConfigManager.java | 7 + .../server/blockmanagement/HostFileManager.java | 6 + .../hdfs/server/namenode/FSNamesystem.java | 29 + .../namenode/metrics/FSNamesystemMBean.java | 15 + .../apache/hadoop/hdfs/AdminStatesBaseTest.java | 375 ++++++++++++ .../apache/hadoop/hdfs/TestDecommission.java | 592 ++++++------------- .../hadoop/hdfs/TestMaintenanceState.java | 310 ++++++++++ .../namenode/TestDecommissioningStatus.java | 2 +- .../hadoop/hdfs/util/HostsFileWriter.java | 55 +- .../hdfs/util/TestCombinedHostsFileReader.java | 2 +- .../src/test/resources/dfs.hosts.json | 2 + 19 files changed, 1165 insertions(+), 472 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java index 9f7b983..2abed81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java @@ -33,6 +33,7 @@ public class DatanodeAdminProperties { private int port; private String upgradeDomain; private AdminStates adminState = AdminStates.NORMAL; + private long maintenanceExpireTimeInMS = Long.MAX_VALUE; /** * Return the host name of the datanode. @@ -97,4 +98,22 @@ public class DatanodeAdminProperties { public void setAdminState(final AdminStates adminState) { this.adminState = adminState; } + + /** + * Get the maintenance expiration time in milliseconds. + * @return the maintenance expiration time in milliseconds. + */ + public long getMaintenanceExpireTimeInMS() { + return this.maintenanceExpireTimeInMS; + } + + /** + * Get the maintenance expiration time in milliseconds. + * @param maintenanceExpireTimeInMS + * the maintenance expiration time in milliseconds. + */ + public void setMaintenanceExpireTimeInMS( + final long maintenanceExpireTimeInMS) { + this.maintenanceExpireTimeInMS = maintenanceExpireTimeInMS; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java index e04abdd..cd32a53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java @@ -83,6 +83,7 @@ public class DatanodeInfo extends DatanodeID implements Node { } protected AdminStates adminState; + private long maintenanceExpireTimeInMS; public DatanodeInfo(DatanodeInfo from) { super(from); @@ -499,17 +500,28 @@ public class DatanodeInfo extends DatanodeID implements Node { } /** - * Put a node to maintenance mode. + * Start the maintenance operation. */ public void startMaintenance() { - adminState = AdminStates.ENTERING_MAINTENANCE; + this.adminState = AdminStates.ENTERING_MAINTENANCE; } /** - * Put a node to maintenance mode. + * Put a node directly to maintenance mode. */ public void setInMaintenance() { - adminState = AdminStates.IN_MAINTENANCE; + this.adminState = AdminStates.IN_MAINTENANCE; + } + + /** + * @param maintenanceExpireTimeInMS the time that the DataNode is in the + * maintenance mode until in the unit of milliseconds. */ + public void setMaintenanceExpireTimeInMS(long maintenanceExpireTimeInMS) { + this.maintenanceExpireTimeInMS = maintenanceExpireTimeInMS; + } + + public long getMaintenanceExpireTimeInMS() { + return this.maintenanceExpireTimeInMS; } /** @@ -519,6 +531,9 @@ public class DatanodeInfo extends DatanodeID implements Node { adminState = null; } + public static boolean maintenanceNotExpired(long maintenanceExpireTimeInMS) { + return Time.monotonicNow() < maintenanceExpireTimeInMS; + } /** * Returns true if the node is is entering_maintenance */ @@ -541,6 +556,10 @@ public class DatanodeInfo extends DatanodeID implements Node { adminState == AdminStates.IN_MAINTENANCE); } + public boolean maintenanceExpired() { + return !maintenanceNotExpired(this.maintenanceExpireTimeInMS); + } + public boolean isInService() { return getAdminState() == AdminStates.NORMAL; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index 8df2d54..acbc8f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -141,7 +141,7 @@ public final class HdfsConstants { // type of the datanode report public enum DatanodeReportType { - ALL, LIVE, DEAD, DECOMMISSIONING + ALL, LIVE, DEAD, DECOMMISSIONING, ENTERING_MAINTENANCE } public static final byte RS_6_3_POLICY_ID = 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java index 3e913b9..6f9c35e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java @@ -148,6 +148,24 @@ public class CombinedHostFileManager extends HostConfigManager { }; } + synchronized long getMaintenanceExpireTimeInMS( + final InetSocketAddress address) { + Iterable datanode = Iterables.filter( + allDNs.get(address.getAddress()), + new Predicate() { + public boolean apply(DatanodeAdminProperties input) { + return input.getAdminState().equals( + AdminStates.IN_MAINTENANCE) && + (input.getPort() == 0 || + input.getPort() == address.getPort()); + } + }); + // if DN isn't set to maintenance state, ignore MaintenanceExpireTimeInMS + // set in the config. + return datanode.iterator().hasNext() ? + datanode.iterator().next().getMaintenanceExpireTimeInMS() : 0; + } + static class HostIterator extends UnmodifiableIterator { private final Iterator> it; @@ -236,6 +254,11 @@ public class CombinedHostFileManager extends HostConfigManager { return hostProperties.getUpgradeDomain(dn.getResolvedAddress()); } + @Override + public long getMaintenanceExpirationTimeInMS(DatanodeID dn) { + return hostProperties.getMaintenanceExpireTimeInMS(dn.getResolvedAddress()); + } + /** * Set the properties lists by the new instances. The * old instance is discarded. http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index da02a90..fffe29c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -552,7 +552,7 @@ public class DatanodeManager { /** Get a datanode descriptor given corresponding DatanodeUUID */ - DatanodeDescriptor getDatanode(final String datanodeUuid) { + public DatanodeDescriptor getDatanode(final String datanodeUuid) { if (datanodeUuid == null) { return null; } @@ -902,10 +902,14 @@ public class DatanodeManager { * * @param nodeReg datanode */ - void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) { + void startAdminOperationIfNecessary(DatanodeDescriptor nodeReg) { + long maintenanceExpireTimeInMS = + hostConfigManager.getMaintenanceExpirationTimeInMS(nodeReg); // If the registered node is in exclude list, then decommission it if (getHostConfigManager().isExcluded(nodeReg)) { decomManager.startDecommission(nodeReg); + } else if (nodeReg.maintenanceNotExpired(maintenanceExpireTimeInMS)) { + decomManager.startMaintenance(nodeReg, maintenanceExpireTimeInMS); } } @@ -1017,7 +1021,7 @@ public class DatanodeManager { // also treat the registration message as a heartbeat heartbeatManager.register(nodeS); incrementVersionCount(nodeS.getSoftwareVersion()); - startDecommissioningIfExcluded(nodeS); + startAdminOperationIfNecessary(nodeS); success = true; } finally { if (!success) { @@ -1056,7 +1060,7 @@ public class DatanodeManager { heartbeatManager.addDatanode(nodeDescr); heartbeatManager.updateDnStat(nodeDescr); incrementVersionCount(nodeReg.getSoftwareVersion()); - startDecommissioningIfExcluded(nodeDescr); + startAdminOperationIfNecessary(nodeDescr); success = true; } finally { if (!success) { @@ -1122,9 +1126,14 @@ public class DatanodeManager { if (!hostConfigManager.isIncluded(node)) { node.setDisallowed(true); // case 2. } else { - if (hostConfigManager.isExcluded(node)) { + long maintenanceExpireTimeInMS = + hostConfigManager.getMaintenanceExpirationTimeInMS(node); + if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) { + decomManager.startMaintenance(node, maintenanceExpireTimeInMS); + } else if (hostConfigManager.isExcluded(node)) { decomManager.startDecommission(node); // case 3. } else { + decomManager.stopMaintenance(node); decomManager.stopDecommission(node); // case 4. } } @@ -1157,7 +1166,12 @@ public class DatanodeManager { // A decommissioning DN may be "alive" or "dead". return getDatanodeListForReport(DatanodeReportType.DECOMMISSIONING); } - + + /** @return list of datanodes that are entering maintenance. */ + public List getEnteringMaintenanceNodes() { + return getDatanodeListForReport(DatanodeReportType.ENTERING_MAINTENANCE); + } + /* Getter and Setter for stale DataNodes related attributes */ /** @@ -1342,6 +1356,9 @@ public class DatanodeManager { final boolean listDecommissioningNodes = type == DatanodeReportType.ALL || type == DatanodeReportType.DECOMMISSIONING; + final boolean listEnteringMaintenanceNodes = + type == DatanodeReportType.ALL || + type == DatanodeReportType.ENTERING_MAINTENANCE; ArrayList nodes; final HostSet foundNodes = new HostSet(); @@ -1353,10 +1370,12 @@ public class DatanodeManager { for (DatanodeDescriptor dn : datanodeMap.values()) { final boolean isDead = isDatanodeDead(dn); final boolean isDecommissioning = dn.isDecommissionInProgress(); + final boolean isEnteringMaintenance = dn.isEnteringMaintenance(); if (((listLiveNodes && !isDead) || (listDeadNodes && isDead) || - (listDecommissioningNodes && isDecommissioning)) && + (listDecommissioningNodes && isDecommissioning) || + (listEnteringMaintenanceNodes && isEnteringMaintenance)) && hostConfigManager.isIncluded(dn)) { nodes.add(dn); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java index bcc9bba..0d4e235 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java @@ -47,7 +47,7 @@ class DatanodeStats { synchronized void add(final DatanodeDescriptor node) { xceiverCount += node.getXceiverCount(); - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { capacityUsed += node.getDfsUsed(); blockPoolUsed += node.getBlockPoolUsed(); nodesInService++; @@ -56,7 +56,8 @@ class DatanodeStats { capacityRemaining += node.getRemaining(); cacheCapacity += node.getCacheCapacity(); cacheUsed += node.getCacheUsed(); - } else if (!node.isDecommissioned()) { + } else if (node.isDecommissionInProgress() || + node.isEnteringMaintenance()) { cacheCapacity += node.getCacheCapacity(); cacheUsed += node.getCacheUsed(); } @@ -74,7 +75,7 @@ class DatanodeStats { synchronized void subtract(final DatanodeDescriptor node) { xceiverCount -= node.getXceiverCount(); - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { capacityUsed -= node.getDfsUsed(); blockPoolUsed -= node.getBlockPoolUsed(); nodesInService--; @@ -83,7 +84,8 @@ class DatanodeStats { capacityRemaining -= node.getRemaining(); cacheCapacity -= node.getCacheCapacity(); cacheUsed -= node.getCacheUsed(); - } else if (!node.isDecommissioned()) { + } else if (node.isDecommissionInProgress() || + node.isEnteringMaintenance()) { cacheCapacity -= node.getCacheCapacity(); cacheUsed -= node.getCacheUsed(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index ec6d9ba..c456aba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -86,8 +86,11 @@ public class DecommissionManager { private final ScheduledExecutorService executor; /** - * Map containing the decommission-in-progress datanodes that are being - * tracked so they can be be marked as decommissioned. + * Map containing the DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE + * datanodes that are being tracked so they can be be marked as + * DECOMMISSIONED or IN_MAINTENANCE. Even after the node is marked as + * IN_MAINTENANCE, the node remains in the map until + * maintenance expires checked during a monitor tick. *

* This holds a set of references to the under-replicated blocks on the DN at * the time the DN is added to the map, i.e. the blocks that are preventing @@ -102,12 +105,12 @@ public class DecommissionManager { * another check is done with the actual block map. */ private final TreeMap> - decomNodeBlocks; + outOfServiceNodeBlocks; /** - * Tracking a node in decomNodeBlocks consumes additional memory. To limit - * the impact on NN memory consumption, we limit the number of nodes in - * decomNodeBlocks. Additional nodes wait in pendingNodes. + * Tracking a node in outOfServiceNodeBlocks consumes additional memory. To + * limit the impact on NN memory consumption, we limit the number of nodes in + * outOfServiceNodeBlocks. Additional nodes wait in pendingNodes. */ private final Queue pendingNodes; @@ -122,7 +125,7 @@ public class DecommissionManager { executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d") .setDaemon(true).build()); - decomNodeBlocks = new TreeMap<>(); + outOfServiceNodeBlocks = new TreeMap<>(); pendingNodes = new LinkedList<>(); } @@ -222,13 +225,56 @@ public class DecommissionManager { } // Remove from tracking in DecommissionManager pendingNodes.remove(node); - decomNodeBlocks.remove(node); + outOfServiceNodeBlocks.remove(node); } else { LOG.trace("stopDecommission: Node {} in {}, nothing to do." + node, node.getAdminState()); } } + /** + * Start maintenance of the specified datanode. + * @param node + */ + @VisibleForTesting + public void startMaintenance(DatanodeDescriptor node, + long maintenanceExpireTimeInMS) { + // Even if the node is already in maintenance, we still need to adjust + // the expiration time. + node.setMaintenanceExpireTimeInMS(maintenanceExpireTimeInMS); + if (!node.isMaintenance()) { + // Update DN stats maintained by HeartbeatManager + hbManager.startMaintenance(node); + pendingNodes.add(node); + } else { + LOG.trace("startMaintenance: Node {} in {}, nothing to do." + + node, node.getAdminState()); + } + } + + + /** + * Stop maintenance of the specified datanode. + * @param node + */ + @VisibleForTesting + public void stopMaintenance(DatanodeDescriptor node) { + if (node.isMaintenance()) { + // Update DN stats maintained by HeartbeatManager + hbManager.stopMaintenance(node); + + // TODO HDFS-9390 remove replicas from block maps + // or handle over replicated blocks. + + // Remove from tracking in DecommissionManager + pendingNodes.remove(node); + outOfServiceNodeBlocks.remove(node); + } else { + LOG.trace("stopMaintenance: Node {} in {}, nothing to do." + + node, node.getAdminState()); + } + } + private void setDecommissioned(DatanodeDescriptor dn) { dn.setDecommissioned(); LOG.info("Decommissioning complete for node {}", dn); @@ -313,7 +359,7 @@ public class DecommissionManager { @VisibleForTesting public int getNumTrackedNodes() { - return decomNodeBlocks.size(); + return outOfServiceNodeBlocks.size(); } @VisibleForTesting @@ -333,8 +379,8 @@ public class DecommissionManager { */ private final int numBlocksPerCheck; /** - * The maximum number of nodes to track in decomNodeBlocks. A value of 0 - * means no limit. + * The maximum number of nodes to track in outOfServiceNodeBlocks. + * A value of 0 means no limit. */ private final int maxConcurrentTrackedNodes; /** @@ -347,7 +393,7 @@ public class DecommissionManager { */ private int numNodesChecked = 0; /** - * The last datanode in decomNodeBlocks that we've processed + * The last datanode in outOfServiceNodeBlocks that we've processed */ private DatanodeDescriptor iterkey = new DatanodeDescriptor(new DatanodeID("", "", "", 0, 0, 0, 0)); @@ -393,14 +439,15 @@ public class DecommissionManager { private void processPendingNodes() { while (!pendingNodes.isEmpty() && (maxConcurrentTrackedNodes == 0 || - decomNodeBlocks.size() < maxConcurrentTrackedNodes)) { - decomNodeBlocks.put(pendingNodes.poll(), null); + outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) { + outOfServiceNodeBlocks.put(pendingNodes.poll(), null); } } private void check() { final Iterator>> - it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator(); + it = new CyclicIteration<>(outOfServiceNodeBlocks, + iterkey).iterator(); final LinkedList toRemove = new LinkedList<>(); while (it.hasNext() && !exceededNumBlocksPerCheck()) { @@ -410,6 +457,17 @@ public class DecommissionManager { final DatanodeDescriptor dn = entry.getKey(); AbstractList blocks = entry.getValue(); boolean fullScan = false; + if (dn.isMaintenance()) { + // TODO HDFS-9390 make sure blocks are minimally replicated + // before transitioning the node to IN_MAINTENANCE state. + + // If maintenance expires, stop tracking it. + if (dn.maintenanceExpired()) { + stopMaintenance(dn); + toRemove.add(dn); + } + continue; + } if (blocks == null) { // This is a newly added datanode, run through its list to schedule // under-replicated blocks for replication and collect the blocks @@ -417,7 +475,7 @@ public class DecommissionManager { LOG.debug("Newly-added node {}, doing full scan to find " + "insufficiently-replicated blocks.", dn); blocks = handleInsufficientlyStored(dn); - decomNodeBlocks.put(dn, blocks); + outOfServiceNodeBlocks.put(dn, blocks); fullScan = true; } else { // This is a known datanode, check if its # of insufficiently @@ -436,7 +494,7 @@ public class DecommissionManager { LOG.debug("Node {} has finished replicating current set of " + "blocks, checking with the full block map.", dn); blocks = handleInsufficientlyStored(dn); - decomNodeBlocks.put(dn, blocks); + outOfServiceNodeBlocks.put(dn, blocks); } // If the full scan is clean AND the node liveness is okay, // we can finally mark as decommissioned. @@ -460,11 +518,12 @@ public class DecommissionManager { } iterkey = dn; } - // Remove the datanodes that are decommissioned + // Remove the datanodes that are decommissioned or in service after + // maintenance expiration. for (DatanodeDescriptor dn : toRemove) { - Preconditions.checkState(dn.isDecommissioned(), - "Removing a node that is not yet decommissioned!"); - decomNodeBlocks.remove(dn); + Preconditions.checkState(dn.isDecommissioned() || dn.isInService(), + "Removing a node that is not yet decommissioned or in service!"); + outOfServiceNodeBlocks.remove(dn); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index cec4a1a..d728ee2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -265,6 +265,33 @@ class HeartbeatManager implements DatanodeStatistics { } } + synchronized void startMaintenance(final DatanodeDescriptor node) { + if (!node.isAlive()) { + LOG.info("Dead node {} is put in maintenance state immediately.", node); + node.setInMaintenance(); + } else if (node.isDecommissioned()) { + LOG.info("Decommissioned node " + node + " is put in maintenance state" + + " immediately."); + node.setInMaintenance(); + } else { + stats.subtract(node); + node.startMaintenance(); + stats.add(node); + } + } + + synchronized void stopMaintenance(final DatanodeDescriptor node) { + LOG.info("Stopping maintenance of {} node {}", + node.isAlive() ? "live" : "dead", node); + if (!node.isAlive()) { + node.stopMaintenance(); + } else { + stats.subtract(node); + node.stopMaintenance(); + stats.add(node); + } + } + synchronized void stopDecommission(final DatanodeDescriptor node) { LOG.info("Stopping decommissioning of {} node {}", node.isAlive() ? "live" : "dead", node); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java index f28ed29..0ab4ebc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java @@ -77,4 +77,11 @@ public abstract class HostConfigManager implements Configurable { * @return the upgrade domain of dn. */ public abstract String getUpgradeDomain(DatanodeID dn); + + /** + * Get the maintenance expiration time in milli seconds. + * @param dn the DatanodeID of the datanode + * @return the maintenance expiration time of dn. + */ + public abstract long getMaintenanceExpirationTimeInMS(DatanodeID dn); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java index bcfebf2..59f907f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java @@ -138,6 +138,12 @@ public class HostFileManager extends HostConfigManager { return null; } + @Override + public long getMaintenanceExpirationTimeInMS(DatanodeID dn) { + // The include/exclude files based config doesn't support maintenance mode. + return 0; + } + /** * Read the includes and excludes lists from the named files. Any previous * includes and excludes lists are discarded. http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 52fbaa7..f4b742e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -7079,5 +7079,34 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return blockManager.getBytesInFuture(); } + + @Override // FSNamesystemMBean + public int getNumInMaintenanceLiveDataNodes() { + final List live = new ArrayList(); + getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true); + int liveInMaintenance = 0; + for (DatanodeDescriptor node : live) { + liveInMaintenance += node.isInMaintenance() ? 1 : 0; + } + return liveInMaintenance; + } + + @Override // FSNamesystemMBean + public int getNumInMaintenanceDeadDataNodes() { + final List dead = new ArrayList(); + getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, true); + int deadInMaintenance = 0; + for (DatanodeDescriptor node : dead) { + deadInMaintenance += node.isInMaintenance() ? 1 : 0; + } + return deadInMaintenance; + } + + @Override // FSNamesystemMBean + public int getNumEnteringMaintenanceDataNodes() { + return getBlockManager().getDatanodeManager().getEnteringMaintenanceNodes() + .size(); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java index b314f7f..f1e7515 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java @@ -208,4 +208,19 @@ public interface FSNamesystemMBean { * Return total time spent doing sync operations on FSEditLog. */ String getTotalSyncTimes(); + + /** + * @return Number of IN_MAINTENANCE live data nodes + */ + int getNumInMaintenanceLiveDataNodes(); + + /** + * @return Number of IN_MAINTENANCE dead data nodes + */ + int getNumInMaintenanceDeadDataNodes(); + + /** + * @return Number of ENTERING_MAINTENANCE data nodes + */ + int getNumEnteringMaintenanceDataNodes(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java new file mode 100644 index 0000000..0698628 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.util.HostsFileWriter; +import org.junit.After; +import org.junit.Before; + +/** + * This class provide utilities for testing of the admin operations of nodes. + */ +public class AdminStatesBaseTest { + public static final Log LOG = LogFactory.getLog(AdminStatesBaseTest.class); + static final long seed = 0xDEADBEEFL; + static final int blockSize = 8192; + static final int fileSize = 16384; + static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds + static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec + static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval + + final private Random myrand = new Random(); + + private HostsFileWriter hostsFileWriter; + private Configuration conf; + private MiniDFSCluster cluster = null; + private boolean useCombinedHostFileManager = false; + + protected void setUseCombinedHostFileManager() { + useCombinedHostFileManager = true; + } + + protected Configuration getConf() { + return conf; + } + + protected MiniDFSCluster getCluster() { + return cluster; + } + + @Before + public void setup() throws IOException { + // Set up the hosts/exclude files. + hostsFileWriter = new HostsFileWriter(); + conf = new HdfsConfiguration(); + + if (useCombinedHostFileManager) { + conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, + CombinedHostFileManager.class, HostConfigManager.class); + } + + // Setup conf + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 200); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); + conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + BLOCKREPORT_INTERVAL_MSEC); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, + NAMENODE_REPLICATION_INTERVAL); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); + + hostsFileWriter.initialize(conf, "temp/admin"); + } + + @After + public void teardown() throws IOException { + hostsFileWriter.cleanup(); + shutdownCluster(); + } + + protected void writeFile(FileSystem fileSys, Path name, int repl) + throws IOException { + writeFile(fileSys, name, repl, 2); + } + + protected void writeFile(FileSystem fileSys, Path name, int repl, + int numOfBlocks) throws IOException { + writeFile(fileSys, name, repl, numOfBlocks, true); + } + + protected FSDataOutputStream writeFile(FileSystem fileSys, Path name, + int repl, int numOfBlocks, boolean completeFile) + throws IOException { + // create and write a file that contains two blocks of data + FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf() + .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), + (short) repl, blockSize); + byte[] buffer = new byte[blockSize*numOfBlocks]; + Random rand = new Random(seed); + rand.nextBytes(buffer); + stm.write(buffer); + LOG.info("Created file " + name + " with " + repl + " replicas."); + if (completeFile) { + stm.close(); + return null; + } else { + // Do not close stream, return it + // so that it is not garbage collected + return stm; + } + } + + /* + * decommission the DN or put the DN into maintenance for datanodeUuid or one + * random node if datanodeUuid is null. + * And wait for the node to reach the given {@code waitForState}. + */ + protected DatanodeInfo takeNodeOutofService(int nnIndex, + String datanodeUuid, long maintenanceExpirationInMS, + ArrayList decommissionedNodes, + AdminStates waitForState) throws IOException { + return takeNodeOutofService(nnIndex, datanodeUuid, + maintenanceExpirationInMS, decommissionedNodes, null, waitForState); + } + + /* + * decommission the DN or put the DN to maintenance set by datanodeUuid + * Pick randome node if datanodeUuid == null + * wait for the node to reach the given {@code waitForState}. + */ + protected DatanodeInfo takeNodeOutofService(int nnIndex, + String datanodeUuid, long maintenanceExpirationInMS, + List decommissionedNodes, + Map inMaintenanceNodes, AdminStates waitForState) + throws IOException { + DFSClient client = getDfsClient(nnIndex); + DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL); + boolean isDecommissionRequest = + waitForState == AdminStates.DECOMMISSION_INPROGRESS || + waitForState == AdminStates.DECOMMISSIONED; + + // + // pick one datanode randomly unless the caller specifies one. + // + int index = 0; + if (datanodeUuid == null) { + boolean found = false; + while (!found) { + index = myrand.nextInt(info.length); + if ((isDecommissionRequest && !info[index].isDecommissioned()) || + (!isDecommissionRequest && !info[index].isInMaintenance())) { + found = true; + } + } + } else { + // The caller specifies a DN + for (; index < info.length; index++) { + if (info[index].getDatanodeUuid().equals(datanodeUuid)) { + break; + } + } + if (index == info.length) { + throw new IOException("invalid datanodeUuid " + datanodeUuid); + } + } + String nodename = info[index].getXferAddr(); + LOG.info("Taking node: " + nodename + " out of service"); + + ArrayList decommissionNodes = new ArrayList(); + if (decommissionedNodes != null) { + for (DatanodeInfo dn : decommissionedNodes) { + decommissionNodes.add(dn.getName()); + } + } + Map maintenanceNodes = new HashMap<>(); + if (inMaintenanceNodes != null) { + for (Map.Entry dn : + inMaintenanceNodes.entrySet()) { + maintenanceNodes.put(dn.getKey().getName(), dn.getValue()); + } + } + + if (isDecommissionRequest) { + decommissionNodes.add(nodename); + } else { + maintenanceNodes.put(nodename, maintenanceExpirationInMS); + } + + // write node names into the json host file. + hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes); + refreshNodes(nnIndex); + DatanodeInfo ret = NameNodeAdapter.getDatanode( + cluster.getNamesystem(nnIndex), info[index]); + waitNodeState(ret, waitForState); + return ret; + } + + /* Ask a specific NN to put the datanode in service and wait for it + * to reach the NORMAL state. + */ + protected void putNodeInService(int nnIndex, + DatanodeInfo outOfServiceNode) throws IOException { + LOG.info("Putting node: " + outOfServiceNode + " in service"); + ArrayList decommissionNodes = new ArrayList<>(); + Map maintenanceNodes = new HashMap<>(); + + DatanodeManager dm = + cluster.getNamesystem(nnIndex).getBlockManager().getDatanodeManager(); + List nodes = + dm.getDatanodeListForReport(DatanodeReportType.ALL); + for (DatanodeDescriptor node : nodes) { + if (node.isMaintenance()) { + maintenanceNodes.put(node.getName(), + node.getMaintenanceExpireTimeInMS()); + } else if (node.isDecommissionInProgress() || node.isDecommissioned()) { + decommissionNodes.add(node.getName()); + } + } + decommissionNodes.remove(outOfServiceNode.getName()); + maintenanceNodes.remove(outOfServiceNode.getName()); + + hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes); + refreshNodes(nnIndex); + waitNodeState(outOfServiceNode, AdminStates.NORMAL); + } + + protected void putNodeInService(int nnIndex, + String datanodeUuid) throws IOException { + DatanodeInfo datanodeInfo = + getDatanodeDesriptor(cluster.getNamesystem(nnIndex), datanodeUuid); + putNodeInService(nnIndex, datanodeInfo); + } + + /* + * Wait till node is transitioned to the expected state. + */ + protected void waitNodeState(DatanodeInfo node, + AdminStates state) { + boolean done = state == node.getAdminState(); + while (!done) { + LOG.info("Waiting for node " + node + " to change state to " + + state + " current state: " + node.getAdminState()); + try { + Thread.sleep(HEARTBEAT_INTERVAL * 500); + } catch (InterruptedException e) { + // nothing + } + done = state == node.getAdminState(); + } + LOG.info("node " + node + " reached the state " + state); + } + + protected void initIncludeHost(String hostNameAndPort) throws IOException { + hostsFileWriter.initIncludeHost(hostNameAndPort); + } + + protected void initIncludeHosts(String[] hostNameAndPorts) + throws IOException { + hostsFileWriter.initIncludeHosts(hostNameAndPorts); + } + + protected void initExcludeHost(String hostNameAndPort) throws IOException { + hostsFileWriter.initExcludeHost(hostNameAndPort); + } + + protected void initExcludeHosts(List hostNameAndPorts) + throws IOException { + hostsFileWriter.initExcludeHosts(hostNameAndPorts); + } + + /* Get DFSClient to the namenode */ + protected DFSClient getDfsClient(final int nnIndex) throws IOException { + return new DFSClient(cluster.getNameNode(nnIndex).getNameNodeAddress(), + conf); + } + + /* Validate cluster has expected number of datanodes */ + protected static void validateCluster(DFSClient client, int numDNs) + throws IOException { + DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); + assertEquals("Number of Datanodes ", numDNs, info.length); + } + + /** Start a MiniDFSCluster. + * @throws IOException */ + protected void startCluster(int numNameNodes, int numDatanodes, + boolean setupHostsFile, long[] nodesCapacity, + boolean checkDataNodeHostConfig) throws IOException { + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)) + .numDataNodes(numDatanodes); + if (setupHostsFile) { + builder.setupHostsFile(setupHostsFile); + } + if (nodesCapacity != null) { + builder.simulatedCapacities(nodesCapacity); + } + if (checkDataNodeHostConfig) { + builder.checkDataNodeHostConfig(checkDataNodeHostConfig); + } + cluster = builder.build(); + cluster.waitActive(); + for (int i = 0; i < numNameNodes; i++) { + DFSClient client = getDfsClient(i); + validateCluster(client, numDatanodes); + } + } + + protected void startCluster(int numNameNodes, int numDatanodes) + throws IOException { + startCluster(numNameNodes, numDatanodes, false, null, false); + } + + protected void startSimpleHACluster(int numDatanodes) throws IOException { + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes( + numDatanodes).build(); + cluster.transitionToActive(0); + cluster.waitActive(); + } + + protected void shutdownCluster() { + if (cluster != null) { + cluster.shutdown(); + } + } + + protected void refreshNodes(final int nnIndex) throws IOException { + cluster.getNamesystem(nnIndex).getBlockManager().getDatanodeManager(). + refreshNodes(conf); + } + + protected DatanodeDescriptor getDatanodeDesriptor( + final FSNamesystem ns, final String datanodeUuid) { + return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid); + } + + protected void cleanupFile(FileSystem fileSys, Path name) throws IOException { + assertTrue(fileSys.exists(name)); + fileSys.delete(name, true); + assertTrue(!fileSys.exists(name)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org