Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 65349 invoked from network); 18 Jan 2011 23:24:48 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 18 Jan 2011 23:24:48 -0000 Received: (qmail 96085 invoked by uid 500); 18 Jan 2011 23:24:48 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 96037 invoked by uid 500); 18 Jan 2011 23:24:47 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 96029 invoked by uid 99); 18 Jan 2011 23:24:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Jan 2011 23:24:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Jan 2011 23:24:40 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D525123889DA; Tue, 18 Jan 2011 23:24:11 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1060619 - in /hadoop/hdfs/trunk: ./ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/... Date: Tue, 18 Jan 2011 23:24:11 -0000 To: hdfs-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110118232411.D525123889DA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: suresh Date: Tue Jan 18 23:24:10 2011 New Revision: 1060619 URL: http://svn.apache.org/viewvc?rev=1060619&view=rev Log: HDFS-1547. Improve decommission mechanism. Contributed by Suresh Srinivas. Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Tue Jan 18 23:24:10 2011 @@ -36,6 +36,8 @@ Trunk (unreleased changes) HDFS-1539. A config option for the datanode to fsycn a block file when block is completely written. (dhruba) + HDFS-1547. Improve decommission mechanism. (suresh) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image Modified: hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml (original) +++ hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml Tue Jan 18 23:24:10 2011 @@ -209,14 +209,16 @@
  • -refreshNodes - : Updates the set of hosts allowed to connect to namenode. - Re-reads the config file to update values defined by dfs.hosts and - dfs.host.exclude and reads the entires (hostnames) in those files. - Each entry not defined in dfs.hosts but in dfs.hosts.exclude - is decommissioned. Each entry defined in dfs.hosts and also in - dfs.host.exclude is stopped from decommissioning if it has aleady - been marked for decommission. Entires not present in both the lists - are decommissioned. + : Updates the namenode with the set of datanodes allowed to + connect to the namenode. Namenodes re-read datanode hostnames + in the file defined by dfs.hosts, dfs.hosts.exclude. Hosts defined + in dfs.hosts are the datanodes that are part of the cluster. + If there are entries in dfs.hosts, only the hosts in it are + allowed to register with the namenode. Entries in dfs.hosts.exclude + are datanodes that need to be decommissioned. Datanodes complete + decommissioning when all the replicas from them are replicated + to other datanodes. Decommissioned nodes are not automatically + shutdown and are not chosen for writing for new replicas.
  • -printTopology Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java Tue Jan 18 23:24:10 2011 @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs; import java.io.UnsupportedEncodingException; +import java.util.Comparator; import java.util.StringTokenizer; import org.apache.hadoop.classification.InterfaceAudience; @@ -32,6 +33,20 @@ import org.apache.hadoop.net.NodeBase; @InterfaceAudience.Private public class DFSUtil { /** + * Compartor for sorting DataNodeInfo[] based on decommissioned states. + * Decommissioned nodes are moved to the end of the array on sorting with + * this compartor. + */ + public static final Comparator DECOM_COMPARATOR = + new Comparator() { + @Override + public int compare(DatanodeInfo a, DatanodeInfo b) { + return a.isDecommissioned() == b.isDecommissioned() ? 0 : + a.isDecommissioned() ? 1 : -1; + } + }; + + /** * Whether the pathname is valid. Currently prohibits relative paths, * and names which contain a ":" or "/" */ Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Tue Jan 18 23:24:10 2011 @@ -59,7 +59,22 @@ public class DatanodeInfo extends Datano protected String hostName = null; // administrative states of a datanode - public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; } + public enum AdminStates { + NORMAL("In Service"), + DECOMMISSION_INPROGRESS("Decommission In Progress"), + DECOMMISSIONED("Decommissioned"); + + final String value; + + AdminStates(final String v) { + this.value = v; + } + + public String toString() { + return value; + } + } + @Nullable protected AdminStates adminState; @@ -274,7 +289,7 @@ public class DatanodeInfo extends Datano /** * Retrieves the admin state of this node. */ - AdminStates getAdminState() { + public AdminStates getAdminState() { if (adminState == null) { return AdminStates.NORMAL; } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Tue Jan 18 23:24:10 2011 @@ -43,6 +43,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -262,6 +263,8 @@ public class JspHelper { FIELD_NONDFS_USED = 7, FIELD_REMAINING = 8, FIELD_PERCENT_REMAINING = 9, + FIELD_ADMIN_STATE = 10, + FIELD_DECOMMISSIONED = 11, SORT_ORDER_ASC = 1, SORT_ORDER_DSC = 2; @@ -285,6 +288,10 @@ public class JspHelper { sortField = FIELD_PERCENT_REMAINING; } else if (field.equals("blocks")) { sortField = FIELD_BLOCKS; + } else if (field.equals("adminstate")) { + sortField = FIELD_ADMIN_STATE; + } else if (field.equals("decommissioned")) { + sortField = FIELD_DECOMMISSIONED; } else { sortField = FIELD_NAME; } @@ -332,6 +339,13 @@ public class JspHelper { case FIELD_BLOCKS: ret = d1.numBlocks() - d2.numBlocks(); break; + case FIELD_ADMIN_STATE: + ret = d1.getAdminState().toString().compareTo( + d2.getAdminState().toString()); + break; + case FIELD_DECOMMISSIONED: + ret = DFSUtil.DECOM_COMPARATOR.compare(d1, d2); + break; case FIELD_NAME: ret = d1.getHostName().compareTo(d2.getHostName()); break; Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Tue Jan 18 23:24:10 2011 @@ -124,6 +124,12 @@ public class DatanodeDescriptor extends private long lastBlocksScheduledRollTime = 0; private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min private int volumeFailures = 0; + + /** + * When set to true, the node is not in include list and is not allowed + * to communicate with the namenode + */ + private boolean disallowed = false; /** Default constructor */ public DatanodeDescriptor() {} @@ -681,4 +687,16 @@ public class DatanodeDescriptor extends super.updateRegInfo(nodeReg); volumeFailures = 0; } + + /** + * Set the flag to indicate if this datanode is disallowed from communicating + * with the namenode. + */ + void setDisallowed(boolean flag) { + disallowed = flag; + } + + boolean isDisallowed() { + return disallowed; + } } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jan 18 23:24:10 2011 @@ -779,6 +779,9 @@ public class FSNamesystem implements FSC clientMachine); for (LocatedBlock b : blocks.getLocatedBlocks()) { clusterMap.pseudoSortByDistance(client, b.getLocations()); + + // Move decommissioned datanodes to the bottom + Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR); } } return blocks; @@ -2559,6 +2562,7 @@ public class FSNamesystem implements FSC clusterMap.remove(nodeS); nodeS.updateRegInfo(nodeReg); nodeS.setHostName(hostName); + nodeS.setDisallowed(false); // Node is in the include list // resolve network location resolveNetworkLocation(nodeS); @@ -2573,6 +2577,7 @@ public class FSNamesystem implements FSC nodeS.isAlive = true; } } + checkDecommissioning(nodeS, dnAddress); return; } @@ -2593,7 +2598,8 @@ public class FSNamesystem implements FSC resolveNetworkLocation(nodeDescr); unprotectedAddDatanode(nodeDescr); clusterMap.add(nodeDescr); - + checkDecommissioning(nodeDescr, dnAddress); + // also treat the registration message as a heartbeat synchronized(heartbeats) { heartbeats.add(nodeDescr); @@ -2699,13 +2705,13 @@ public class FSNamesystem implements FSC } catch(UnregisteredNodeException e) { return new DatanodeCommand[]{DatanodeCommand.REGISTER}; } - + // Check if this datanode should actually be shutdown instead. - if (nodeinfo != null && shouldNodeShutdown(nodeinfo)) { + if (nodeinfo != null && nodeinfo.isDisallowed()) { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } - + if (nodeinfo == null || !nodeinfo.isAlive) { return new DatanodeCommand[]{DatanodeCommand.REGISTER}; } @@ -2754,18 +2760,28 @@ public class FSNamesystem implements FSC private void updateStats(DatanodeDescriptor node, boolean isAdded) { // // The statistics are protected by the heartbeat lock + // For decommissioning/decommissioned nodes, only used capacity + // is counted. // assert(Thread.holdsLock(heartbeats)); if (isAdded) { - capacityTotal += node.getCapacity(); capacityUsed += node.getDfsUsed(); - capacityRemaining += node.getRemaining(); totalLoad += node.getXceiverCount(); + if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + capacityTotal += node.getCapacity(); + capacityRemaining += node.getRemaining(); + } else { + capacityTotal += node.getDfsUsed(); + } } else { - capacityTotal -= node.getCapacity(); capacityUsed -= node.getDfsUsed(); - capacityRemaining -= node.getRemaining(); totalLoad -= node.getXceiverCount(); + if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + capacityTotal -= node.getCapacity(); + capacityRemaining -= node.getRemaining(); + } else { + capacityTotal -= node.getDfsUsed(); + } } } @@ -3072,12 +3088,6 @@ public class FSNamesystem implements FSC + nodeID.getName()); } - // Check if this datanode should actually be shutdown instead. - if (shouldNodeShutdown(node)) { - setDatanodeDead(node); - throw new DisallowedDatanodeException(node); - } - blockManager.processReport(node, newReport); NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime)); } finally { @@ -3209,12 +3219,6 @@ public class FSNamesystem implements FSC +block+" is received from " + nodeID.getName()); } - // Check if this datanode should actually be shutdown instead. - if (shouldNodeShutdown(node)) { - setDatanodeDead(node); - throw new DisallowedDatanodeException(node); - } - blockManager.addBlock(node, block, delHint); } finally { writeUnlock(); @@ -3471,12 +3475,16 @@ public class FSNamesystem implements FSC throws IOException { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { - LOG.info("Start Decommissioning node " + node.getName()); - node.startDecommission(); + LOG.info("Start Decommissioning node " + node.getName() + " with " + + node.numBlocks() + " blocks."); + synchronized (heartbeats) { + updateStats(node, false); + node.startDecommission(); + updateStats(node, true); + } node.decommissioningStatus.setStartTime(now()); - // - // all the blocks that reside on this node have to be - // replicated. + + // all the blocks that reside on this node have to be replicated. checkDecommissionStateInternal(node); } } @@ -3486,8 +3494,14 @@ public class FSNamesystem implements FSC */ public void stopDecommission (DatanodeDescriptor node) throws IOException { - LOG.info("Stop Decommissioning node " + node.getName()); - node.stopDecommission(); + if (node.isDecommissionInProgress() || node.isDecommissioned()) { + LOG.info("Stop Decommissioning node " + node.getName()); + synchronized (heartbeats) { + updateStats(node, false); + node.stopDecommission(); + updateStats(node, true); + } + } } /** @@ -3574,10 +3588,7 @@ public class FSNamesystem implements FSC LOG.info("Decommission complete for node " + node.getName()); } } - if (node.isDecommissioned()) { - return true; - } - return false; + return node.isDecommissioned(); } /** @@ -3627,18 +3638,12 @@ public class FSNamesystem implements FSC DatanodeDescriptor node = it.next(); // Check if not include. if (!inHostsList(node, null)) { - node.setDecommissioned(); // case 2. + node.setDisallowed(true); // case 2. } else { if (inExcludedHostsList(node, null)) { - if (!node.isDecommissionInProgress() && - !node.isDecommissioned()) { - startDecommission(node); // case 3. - } + startDecommission(node); // case 3. } else { - if (node.isDecommissionInProgress() || - node.isDecommissioned()) { - stopDecommission(node); // case 4. - } + stopDecommission(node); // case 4. } } } @@ -3654,39 +3659,22 @@ public class FSNamesystem implements FSC /** * Checks if the node is not on the hosts list. If it is not, then - * it will be ignored. If the node is in the hosts list, but is also - * on the exclude list, then it will be decommissioned. - * Returns FALSE if node is rejected for registration. - * Returns TRUE if node is registered (including when it is on the - * exclude list and is being decommissioned). + * it will be disallowed from registering. */ - private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) - throws IOException { + private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) { assert (hasWriteLock()); - if (!inHostsList(nodeReg, ipAddr)) { - return false; - } - if (inExcludedHostsList(nodeReg, ipAddr)) { - DatanodeDescriptor node = getDatanode(nodeReg); - if (node == null) { - throw new IOException("verifyNodeRegistration: unknown datanode " + - nodeReg.getName()); - } - if (!checkDecommissionStateInternal(node)) { - startDecommission(node); - } - } - return true; + return inHostsList(nodeReg, ipAddr); } /** - * Checks if the Admin state bit is DECOMMISSIONED. If so, then - * we should shut it down. - * - * Returns true if the node should be shutdown. + * Decommission the node if it is in exclude list. */ - private boolean shouldNodeShutdown(DatanodeDescriptor node) { - return (node.isDecommissioned()); + private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) + throws IOException { + // If the registered node is in exclude list, then decommission it + if (inExcludedHostsList(nodeReg, ipAddr)) { + startDecommission(nodeReg); + } } /** @@ -4941,6 +4929,9 @@ public class FSNamesystem implements FSC } } + /** + * @return list of datanodes where decommissioning is in progress + */ public ArrayList getDecommissioningNodes() { readLock(); try { @@ -4960,10 +4951,9 @@ public class FSNamesystem implements FSC } } - /* - * Delegation Token + /** + * Create delegation token secret manager */ - private DelegationTokenSecretManager createDelegationTokenSecretManager( Configuration conf) { return new DelegationTokenSecretManager(conf.getLong( @@ -5288,6 +5278,7 @@ public class FSNamesystem implements FSC final Map innerinfo = new HashMap(); innerinfo.put("lastContact", getLastContact(node)); innerinfo.put("usedSpace", getDfsUsed(node)); + innerinfo.put("adminState", node.getAdminState().toString()); info.put(node.getHostName(), innerinfo); } return JSON.toString(info); @@ -5305,6 +5296,7 @@ public class FSNamesystem implements FSC for (DatanodeDescriptor node : deadNodeList) { final Map innerinfo = new HashMap(); innerinfo.put("lastContact", getLastContact(node)); + innerinfo.put("decommissioned", node.isDecommissioned()); info.put(node.getHostName(), innerinfo); } return JSON.toString(info); Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Tue Jan 18 23:24:10 2011 @@ -207,7 +207,17 @@ class NamenodeJspHelper { ArrayList live = new ArrayList(); ArrayList dead = new ArrayList(); fsn.DFSNodesStatus(live, dead); + + int liveDecommissioned = 0; + for (DatanodeDescriptor d : live) { + liveDecommissioned += d.isDecommissioned() ? 1 : 0; + } + int deadDecommissioned = 0; + for (DatanodeDescriptor d : dead) { + deadDecommissioned += d.isDecommissioned() ? 1 : 0; + } + ArrayList decommissioning = fsn .getDecommissioningNodes(); @@ -292,9 +302,13 @@ class NamenodeJspHelper { + colTxt() + StringUtils.limitDecimalTo2(dev) + " %" + rowTxt() + colTxt() + "Live Nodes " - + colTxt() + ":" + colTxt() + live.size() + rowTxt() + colTxt() + + colTxt() + ":" + colTxt() + live.size() + + " (Decommissioned: " + liveDecommissioned + ")" + + rowTxt() + colTxt() + "Dead Nodes " - + colTxt() + ":" + colTxt() + dead.size() + rowTxt() + colTxt() + + colTxt() + ":" + colTxt() + dead.size() + + " (Decommissioned: " + deadDecommissioned + ")" + + rowTxt() + colTxt() + "" + "Decommissioning Nodes " + colTxt() + ":" + colTxt() + decommissioning.size() @@ -446,8 +460,11 @@ class NamenodeJspHelper { */ generateNodeDataHeader(out, d, suffix, alive, nnHttpPort); - if (!alive) + if (!alive) { + out.print(" " + + d.isDecommissioned() + "\n"); return; + } long c = d.getCapacity(); long u = d.getDfsUsed(); @@ -457,9 +474,7 @@ class NamenodeJspHelper { String percentRemaining = StringUtils.limitDecimalTo2(d .getRemainingPercent()); - String adminState = (d.isDecommissioned() ? "Decommissioned" : (d - .isDecommissionInProgress() ? "Decommission In Progress" - : "In Service")); + String adminState = d.getAdminState().toString(); long timestamp = d.getLastUpdate(); long currentTime = System.currentTimeMillis(); @@ -502,7 +517,6 @@ class NamenodeJspHelper { sorterOrder = "ASC"; JspHelper.sortNodeList(live, sorterField, sorterOrder); - JspHelper.sortNodeList(dead, "name", "ASC"); // Find out common suffix. Should this be before or after the sort? String port_suffix = null; @@ -535,7 +549,6 @@ class NamenodeJspHelper { int nnHttpPort = nn.getHttpAddress().getPort(); out.print("
    "); if (whatNodes.equals("LIVE")) { - out.print("" + "Live Datanodes : " + live.size() + "" + "

    \n\n"); @@ -577,9 +590,11 @@ class NamenodeJspHelper { if (dead.size() > 0) { out.print("
    " - + "
    Node \n"); + + " Node Decommissioned\n"); - JspHelper.sortNodeList(dead, "name", "ASC"); + JspHelper.sortNodeList(dead, sorterField, sorterOrder); for (int i = 0; i < dead.size(); i++) { generateNodeData(out, dead.get(i), port_suffix, false, nnHttpPort); } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Tue Jan 18 23:24:10 2011 @@ -501,16 +501,18 @@ public class DFSAdmin extends FsShell { "Set/Unset/Check flag to attempt restore of failed storage replicas if they become available.\n" + "\t\tRequires superuser permissions.\n"; - String refreshNodes = "-refreshNodes: \tUpdates the set of hosts allowed " + - "to connect to namenode.\n\n" + - "\t\tRe-reads the config file to update values defined by \n" + - "\t\tdfs.hosts and dfs.host.exclude and reads the \n" + - "\t\tentires (hostnames) in those files.\n\n" + - "\t\tEach entry not defined in dfs.hosts but in \n" + - "\t\tdfs.hosts.exclude is decommissioned. Each entry defined \n" + - "\t\tin dfs.hosts and also in dfs.host.exclude is stopped from \n" + - "\t\tdecommissioning if it has aleady been marked for decommission.\n" + - "\t\tEntires not present in both the lists are decommissioned.\n"; + String refreshNodes = "-refreshNodes: \tUpdates the namenode with the " + + "set of datanodes allowed to connect to the namenode.\n\n" + + "\t\tNamenode re-reads datanode hostnames from the file defined by \n" + + "\t\tdfs.hosts, dfs.hosts.exclude configuration parameters.\n" + + "\t\tHosts defined in dfs.hosts are the datanodes that are part of \n" + + "\t\tthe cluster. If there are entries in dfs.hosts, only the hosts \n" + + "\t\tin it are allowed to register with the namenode.\n\n" + + "\t\tEntries in dfs.hosts.exclude are datanodes that need to be \n" + + "\t\tdecommissioned. Datanodes complete decommissioning when \n" + + "\t\tall the replicas from them are replicated to other datanodes.\n" + + "\t\tDecommissioned nodes are not automatically shutdown and \n" + + "\t\tare not chosen for writing new replicas.\n"; String finalizeUpgrade = "-finalizeUpgrade: Finalize upgrade of HDFS.\n" + "\t\tDatanodes delete their previous version working directories,\n" + Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml Tue Jan 18 23:24:10 2011 @@ -15899,39 +15899,47 @@ RegexpComparator - ^-refreshNodes:( |\t)*Updates the set of hosts allowed to connect to namenode.( )* + ^-refreshNodes:( |\t)*Updates the namenode with the set of datanodes allowed to connect to the namenode.( )* RegexpComparator - ^( |\t)*Re-reads the config file to update values defined by( )* + ^( |\t)*Namenode re-reads datanode hostnames from the file defined by( )* RegexpComparator - ^( |\t)*dfs.hosts and dfs.host.exclude and reads the( )* + ^( |\t)*dfs.hosts, dfs.hosts.exclude configuration parameters.( )* RegexpComparator - ^( |\t)*entires \(hostnames\) in those files.( )* + ^( |\t)*Hosts defined in dfs.hosts are the datanodes that are part of( )* RegexpComparator - ^( |\t)*Each entry not defined in dfs.hosts but in( )* + ^( |\t)*the cluster. If there are entries in dfs.hosts, only the hosts( )* RegexpComparator - ^( |\t)*dfs.hosts.exclude is decommissioned. Each entry defined( )* + ^( |\t)*in it are allowed to register with the namenode.( )* RegexpComparator - ^( |\t)*in dfs.hosts and also in dfs.host.exclude is stopped from( )* + ^( |\t)*Entries in dfs.hosts.exclude are datanodes that need to be( )* RegexpComparator - ^( |\t)*decommissioning if it has aleady been marked for decommission.( )* + ^( |\t)*decommissioned. Datanodes complete decommissioning when ( )* RegexpComparator - ^( |\t)*Entires not present in both the lists are decommissioned.( )* + ^( |\t)*all the replicas from them are replicated to other datanodes.( )* + + + RegexpComparator + ^( |\t)*Decommissioned nodes are not automatically shutdown and( )* + + + RegexpComparator + ^( |\t)*are not chosen for writing new replicas.( )* Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Jan 18 23:24:10 2011 @@ -20,9 +20,12 @@ package org.apache.hadoop.hdfs; import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; import java.io.File; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; import java.io.RandomAccessFile; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.URI; import java.net.URISyntaxException; import java.nio.channels.FileChannel; @@ -86,6 +89,7 @@ public class MiniDFSCluster { private long [] simulatedCapacities = null; // wait until namenode has left safe mode? private boolean waitSafeMode = true; + private boolean setupHostsFile = false; public Builder(Configuration conf) { this.conf = conf; @@ -172,6 +176,15 @@ public class MiniDFSCluster { } /** + * Default: false + * When true the hosts file/include file for the cluster is setup + */ + public Builder setupHostsFile(boolean val) { + this.setupHostsFile = val; + return this; + } + + /** * Construct the actual MiniDFSCluster */ public MiniDFSCluster build() throws IOException { @@ -193,7 +206,8 @@ public class MiniDFSCluster { builder.racks, builder.hosts, builder.simulatedCapacities, - builder.waitSafeMode); + builder.waitSafeMode, + builder.setupHostsFile); } public class DataNodeProperties { @@ -390,13 +404,14 @@ public class MiniDFSCluster { long[] simulatedCapacities) throws IOException { initMiniDFSCluster(nameNodePort, conf, numDataNodes, format, manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts, - simulatedCapacities, true); + simulatedCapacities, true, false); } private void initMiniDFSCluster(int nameNodePort, Configuration conf, int numDataNodes, boolean format, boolean manageNameDfsDirs, boolean manageDataDfsDirs, StartupOption operation, String[] racks, - String[] hosts, long[] simulatedCapacities, boolean waitSafeMode) + String[] hosts, long[] simulatedCapacities, boolean waitSafeMode, + boolean setupHostsFile) throws IOException { this.conf = conf; base_dir = new File(getBaseDirectory()); @@ -461,8 +476,8 @@ public class MiniDFSCluster { nameNode = NameNode.createNameNode(args, conf); // Start the DataNodes - startDataNodes(conf, numDataNodes, manageDataDfsDirs, - operation, racks, hosts, simulatedCapacities); + startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks, + hosts, simulatedCapacities, setupHostsFile); waitClusterUp(); //make sure ProxyUsers uses the latest conf @@ -531,7 +546,8 @@ public class MiniDFSCluster { public synchronized void startDataNodes(Configuration conf, int numDataNodes, boolean manageDfsDirs, StartupOption operation, String[] racks, String[] hosts, - long[] simulatedCapacities) throws IOException { + long[] simulatedCapacities, + boolean setupHostsFile) throws IOException { int curDatanodesNum = dataNodes.size(); // for mincluster's the default initialDelay for BRs is 0 @@ -577,12 +593,6 @@ public class MiniDFSCluster { + "] is less than the number of datanodes [" + numDataNodes + "]."); } - // Set up the right ports for the datanodes - conf.set("dfs.datanode.address", "127.0.0.1:0"); - conf.set("dfs.datanode.http.address", "127.0.0.1:0"); - conf.set("dfs.datanode.ipc.address", "127.0.0.1:0"); - - String [] dnArgs = (operation == null || operation != StartupOption.ROLLBACK) ? null : new String[] {operation.getName()}; @@ -590,6 +600,8 @@ public class MiniDFSCluster { for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) { Configuration dnConf = new HdfsConfiguration(conf); + // Set up datanode address + setupDatanodeAddress(dnConf, setupHostsFile); if (manageDfsDirs) { File dir1 = new File(data_dir, "data"+(2*i+1)); File dir2 = new File(data_dir, "data"+(2*i+2)); @@ -671,7 +683,8 @@ public class MiniDFSCluster { boolean manageDfsDirs, StartupOption operation, String[] racks ) throws IOException { - startDataNodes( conf, numDataNodes, manageDfsDirs, operation, racks, null, null); + startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null, + null, false); } /** @@ -701,7 +714,7 @@ public class MiniDFSCluster { String[] racks, long[] simulatedCapacities) throws IOException { startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null, - simulatedCapacities); + simulatedCapacities, false); } /** @@ -1197,4 +1210,48 @@ public class MiniDFSCluster { public static String getBaseDirectory() { return System.getProperty("test.build.data", "build/test/data") + "/dfs/"; } + + private int getFreeSocketPort() { + int port = 0; + try { + ServerSocket s = new ServerSocket(0); + port = s.getLocalPort(); + s.close(); + return port; + } catch (IOException e) { + // Could not get a free port. Return default port 0. + } + return port; + } + + private void setupDatanodeAddress(Configuration conf, boolean setupHostsFile) throws IOException { + if (setupHostsFile) { + String hostsFile = conf.get("dfs.hosts", "").trim(); + if (hostsFile.length() == 0) { + throw new IOException("Parameter dfs.hosts is not setup in conf"); + } + // Setup datanode in the include file, if it is defined in the conf + String address = "127.0.0.1:" + getFreeSocketPort(); + conf.set("dfs.datanode.address", address); + addToFile(hostsFile, address); + LOG.info("Adding datanode " + address + " to hosts file " + hostsFile); + } else { + conf.set("dfs.datanode.address", "127.0.0.1:0"); + conf.set("dfs.datanode.http.address", "127.0.0.1:0"); + conf.set("dfs.datanode.ipc.address", "127.0.0.1:0"); + } + } + + private void addToFile(String p, String address) throws IOException { + File f = new File(p); + if (!f.exists()) { + f.createNewFile(); + } + PrintWriter writer = new PrintWriter(new FileWriter(f, true)); + try { + writer.println(address); + } finally { + writer.close(); + } + } } Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java?rev=1060619&r1=1060618&r2=1060619&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java Tue Jan 18 23:24:10 2011 @@ -24,8 +24,8 @@ import java.util.Collection; import java.util.Iterator; import java.util.Random; -import junit.framework.TestCase; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; @@ -33,36 +33,68 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; /** * This class tests the decommissioning of nodes. */ -public class TestDecommission extends TestCase { +public class TestDecommission { + public static final Log LOG = LogFactory.getLog(TestDecommission.class); static final long seed = 0xDEADBEEFL; static final int blockSize = 8192; static final int fileSize = 16384; - static final int numDatanodes = 6; - + static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds Random myrand = new Random(); Path hostsFile; Path excludeFile; - - ArrayList decommissionedNodes = new ArrayList(numDatanodes); - - private enum NodeState {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; } - - private void writeConfigFile(FileSystem fs, Path name, ArrayList nodes) + FileSystem localFileSys; + Configuration conf; + MiniDFSCluster cluster = null; + + @Before + public void setup() throws IOException { + conf = new HdfsConfiguration(); + // Set up the hosts/exclude files. + localFileSys = FileSystem.getLocal(conf); + Path workingDir = localFileSys.getWorkingDirectory(); + Path dir = new Path(workingDir, "build/test/data/work-dir/decommission"); + hostsFile = new Path(dir, "hosts"); + excludeFile = new Path(dir, "exclude"); + + // Setup conf + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); + conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath()); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4); + writeConfigFile(excludeFile, null); + } + + @After + public void teardown() throws IOException { + cleanupFile(localFileSys, excludeFile.getParent()); + if (cluster != null) { + cluster.shutdown(); + } + } + + private void writeConfigFile(Path name, ArrayList nodes) throws IOException { - // delete if it already exists - if (fs.exists(name)) { - fs.delete(name, true); + if (localFileSys.exists(name)) { + localFileSys.delete(name, true); } - FSDataOutputStream stm = fs.create(name); + FSDataOutputStream stm = localFileSys.create(name); if (nodes != null) { for (Iterator it = nodes.iterator(); it.hasNext();) { @@ -87,23 +119,17 @@ public class TestDecommission extends Te stm.close(); } - - private void checkFile(FileSystem fileSys, Path name, int repl) - throws IOException { - DFSTestUtil.waitReplication(fileSys, name, (short) repl); - } - private void printFileLocations(FileSystem fileSys, Path name) throws IOException { BlockLocation[] locations = fileSys.getFileBlockLocations( fileSys.getFileStatus(name), 0, fileSize); for (int idx = 0; idx < locations.length; idx++) { String[] loc = locations[idx].getHosts(); - System.out.print("Block[" + idx + "] : "); + StringBuilder buf = new StringBuilder("Block[" + idx + "] : "); for (int j = 0; j < loc.length; j++) { - System.out.print(loc[j] + " "); + buf.append(loc[j] + " "); } - System.out.println(""); + LOG.info(buf.toString()); } } @@ -112,7 +138,7 @@ public class TestDecommission extends Te * replication factor is 1 more than the specified one. */ private void checkFile(FileSystem fileSys, Path name, int repl, - String downnode) throws IOException { + String downnode, int numDatanodes) throws IOException { // // sleep an additional 10 seconds for the blockreports from the datanodes // to arrive. @@ -126,16 +152,24 @@ public class TestDecommission extends Te for (LocatedBlock blk : dinfo) { // for each block int hasdown = 0; + int firstDecomNodeIndex = -1; DatanodeInfo[] nodes = blk.getLocations(); for (int j = 0; j < nodes.length; j++) { // for each replica if (nodes[j].getName().equals(downnode)) { hasdown++; - System.out.println("Block " + blk.getBlock() + " replica " + - nodes[j].getName() + " is decommissioned."); + LOG.info("Block " + blk.getBlock() + " replica " + nodes[j].getName() + + " is decommissioned."); + } + if (nodes[j].isDecommissioned()) { + if (firstDecomNodeIndex == -1) { + firstDecomNodeIndex = j; + } + continue; } + assertEquals("Decom node is not at the end", firstDecomNodeIndex, -1); } - System.out.println("Block " + blk.getBlock() + " has " + hasdown + - " decommissioned replica."); + LOG.info("Block " + blk.getBlock() + " has " + hasdown + + " decommissioned replica."); assertEquals("Number of replicas for block" + blk.getBlock(), Math.min(numDatanodes, repl+hasdown), nodes.length); } @@ -148,21 +182,21 @@ public class TestDecommission extends Te } private void printDatanodeReport(DatanodeInfo[] info) { - System.out.println("-------------------------------------------------"); + LOG.info("-------------------------------------------------"); for (int i = 0; i < info.length; i++) { - System.out.println(info[i].getDatanodeReport()); - System.out.println(); + LOG.info(info[i].getDatanodeReport()); + LOG.info(""); } } /* * decommission one random node. */ - private String decommissionNode(FSNamesystem namesystem, - Configuration conf, - DFSClient client, - FileSystem localFileSys) + private DatanodeInfo decommissionNode(FSNamesystem namesystem, + ArrayListdecommissionedNodes, + AdminStates waitForState) throws IOException { + DFSClient client = getDfsClient(cluster, conf); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); // @@ -177,122 +211,188 @@ public class TestDecommission extends Te } } String nodename = info[index].getName(); - System.out.println("Decommissioning node: " + nodename); + LOG.info("Decommissioning node: " + nodename); // write nodename into the exclude file. ArrayList nodes = new ArrayList(decommissionedNodes); nodes.add(nodename); - writeConfigFile(localFileSys, excludeFile, nodes); + writeConfigFile(excludeFile, nodes); namesystem.refreshNodes(conf); - return nodename; - } - - /* - * Check if node is in the requested state. - */ - private boolean checkNodeState(FileSystem filesys, - String node, - NodeState state) throws IOException { - DistributedFileSystem dfs = (DistributedFileSystem) filesys; - boolean done = false; - boolean foundNode = false; - DatanodeInfo[] datanodes = dfs.getDataNodeStats(); - for (int i = 0; i < datanodes.length; i++) { - DatanodeInfo dn = datanodes[i]; - if (dn.getName().equals(node)) { - if (state == NodeState.DECOMMISSIONED) { - done = dn.isDecommissioned(); - } else if (state == NodeState.DECOMMISSION_INPROGRESS) { - done = dn.isDecommissionInProgress(); - } else { - done = (!dn.isDecommissionInProgress() && !dn.isDecommissioned()); - } - System.out.println(dn.getDatanodeReport()); - foundNode = true; - } - } - if (!foundNode) { - throw new IOException("Could not find node: " + node); - } - return done; + DatanodeInfo ret = namesystem.getDatanode(info[index]); + waitNodeState(ret, waitForState); + return ret; } /* * Wait till node is fully decommissioned. */ - private void waitNodeState(FileSystem filesys, - String node, - NodeState state) throws IOException { - boolean done = checkNodeState(filesys, node, state); + private void waitNodeState(DatanodeInfo node, + AdminStates state) throws IOException { + boolean done = state == node.getAdminState(); while (!done) { - System.out.println("Waiting for node " + node + - " to change state to " + state); + LOG.info("Waiting for node " + node + " to change state to " + + state + " current state: " + node.getAdminState()); try { - Thread.sleep(1000); + Thread.sleep(HEARTBEAT_INTERVAL * 1000); } catch (InterruptedException e) { // nothing } - done = checkNodeState(filesys, node, state); + done = state == node.getAdminState(); } } - /** - * Tests Decommission in DFS. - */ - public void testDecommission() throws IOException { - Configuration conf = new HdfsConfiguration(); - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); - - // Set up the hosts/exclude files. - FileSystem localFileSys = FileSystem.getLocal(conf); - Path workingDir = localFileSys.getWorkingDirectory(); - Path dir = new Path(workingDir, "build/test/data/work-dir/decommission"); - assertTrue(localFileSys.mkdirs(dir)); - hostsFile = new Path(dir, "hosts"); - excludeFile = new Path(dir, "exclude"); - conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath()); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000); - conf.setInt("dfs.heartbeat.interval", 1); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4); - writeConfigFile(localFileSys, excludeFile, null); - - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(numDatanodes).build(); - cluster.waitActive(); + /* Get DFSClient to the namenode */ + private static DFSClient getDfsClient(MiniDFSCluster cluster, + Configuration conf) throws IOException { InetSocketAddress addr = new InetSocketAddress("localhost", cluster.getNameNodePort()); - DFSClient client = new DFSClient(addr, conf); + return new DFSClient(addr, conf); + } + + /* Validate cluster has expected number of datanodes */ + private static void validateCluster(DFSClient client, int numDNs) + throws IOException { DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); - assertEquals("Number of Datanodes ", numDatanodes, info.length); - FileSystem fileSys = cluster.getFileSystem(); + assertEquals("Number of Datanodes ", numDNs, info.length); + } + + /** Start a MiniDFSCluster + * @throws IOException */ + private void startCluster(int numDatanodes, Configuration conf) + throws IOException { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes) + .build(); + cluster.waitActive(); + DFSClient client = getDfsClient(cluster, conf); + validateCluster(client, numDatanodes); + } + + private void verifyStats(NameNode namenode, FSNamesystem fsn, + DatanodeInfo node, boolean decommissioning) throws InterruptedException { + // Do the stats check over 10 iterations + for (int i = 0; i < 10; i++) { + long[] newStats = namenode.getStats(); + + // For decommissioning nodes, ensure capacity of the DN is no longer + // counted. Only used space of the DN is counted in cluster capacity + assertEquals(newStats[0], decommissioning ? node.getDfsUsed() : + node.getCapacity()); + + // Ensure cluster used capacity is counted for both normal and + // decommissioning nodes + assertEquals(newStats[1], node.getDfsUsed()); + + // For decommissioning nodes, remaining space from the DN is not counted + assertEquals(newStats[2], decommissioning ? 0 : node.getRemaining()); + + // Ensure transceiver count is same as that DN + assertEquals(fsn.getTotalLoad(), node.getXceiverCount()); + + Thread.sleep(HEARTBEAT_INTERVAL * 1000); // Sleep heart beat interval + } + } + /** + * Tests Decommission in DFS. + */ + @Test + public void testDecommission() throws IOException { + LOG.info("Starting test testDecommission"); + int numDatanodes = 6; + startCluster(numDatanodes, conf); try { + DFSClient client = getDfsClient(cluster, conf); + FileSystem fileSys = cluster.getFileSystem(); + FSNamesystem fsn = cluster.getNamesystem(); + ArrayList decommissionedNodes = new ArrayList(numDatanodes); for (int iteration = 0; iteration < numDatanodes - 1; iteration++) { int replicas = numDatanodes - iteration - 1; - // + // Decommission one node. Verify that node is decommissioned. - // - Path file1 = new Path("decommission.dat"); + Path file1 = new Path("testDecommission.dat"); writeFile(fileSys, file1, replicas); - System.out.println("Created file decommission.dat with " + - replicas + " replicas."); - checkFile(fileSys, file1, replicas); + LOG.info("Created file decommission.dat with " + replicas + + " replicas."); printFileLocations(fileSys, file1); - String downnode = decommissionNode(cluster.getNamesystem(), conf, - client, localFileSys); - decommissionedNodes.add(downnode); - waitNodeState(fileSys, downnode, NodeState.DECOMMISSIONED); - checkFile(fileSys, file1, replicas, downnode); + DatanodeInfo downnode = decommissionNode(fsn, decommissionedNodes, + AdminStates.DECOMMISSIONED); + decommissionedNodes.add(downnode.getName()); + + // Ensure decommissioned datanode is not automatically shutdown + assertEquals("All datanodes must be alive", numDatanodes, + client.datanodeReport(DatanodeReportType.LIVE).length); + + checkFile(fileSys, file1, replicas, downnode.getName(), numDatanodes); cleanupFile(fileSys, file1); - cleanupFile(localFileSys, dir); } + + // Restart the cluster and ensure decommissioned datanodes + // are allowed to register with the namenode + cluster.shutdown(); + startCluster(numDatanodes, conf); } catch (IOException e) { - info = client.datanodeReport(DatanodeReportType.ALL); + DFSClient client = getDfsClient(cluster, conf); + DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL); printDatanodeReport(info); throw e; - } finally { - fileSys.close(); - cluster.shutdown(); } } + + /** + * Tests cluster storage statistics during decommissioning + */ + @Test + public void testClusterStats() throws IOException, InterruptedException { + LOG.info("Starting test testClusterStats"); + int numDatanodes = 1; + startCluster(numDatanodes, conf); + + FileSystem fileSys = cluster.getFileSystem(); + Path file = new Path("testClusterStats.dat"); + writeFile(fileSys, file, 1); + + FSNamesystem fsn = cluster.getNamesystem(); + NameNode namenode = cluster.getNameNode(); + ArrayList decommissionedNodes = new ArrayList(numDatanodes); + DatanodeInfo downnode = decommissionNode(fsn, decommissionedNodes, + AdminStates.DECOMMISSION_INPROGRESS); + // Check namenode stats for multiple datanode heartbeats + verifyStats(namenode, fsn, downnode, true); + + // Stop decommissioning and verify stats + writeConfigFile(excludeFile, null); + fsn.refreshNodes(conf); + DatanodeInfo ret = fsn.getDatanode(downnode); + waitNodeState(ret, AdminStates.NORMAL); + verifyStats(namenode, fsn, ret, false); + } + + /** + * Test host file or include file functionality. Only datanodes + * in the include file are allowed to connect to the namenode. + */ + @Test + public void testHostsFile() throws IOException, InterruptedException { + conf.set("dfs.hosts", hostsFile.toUri().getPath()); + int numDatanodes = 1; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes) + .setupHostsFile(true).build(); + cluster.waitActive(); + + // Now empty hosts file and ensure the datanode is disallowed + // from talking to namenode, resulting in it's shutdown. + ArrayListlist = new ArrayList(); + list.add("invalidhost"); + writeConfigFile(hostsFile, list); + cluster.getNamesystem().refreshNodes(conf); + + DFSClient client = getDfsClient(cluster, conf); + DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); + for (int i = 0 ; i < 5 && info.length != 0; i++) { + LOG.info("Waiting for datanode to be marked dead"); + Thread.sleep(HEARTBEAT_INTERVAL * 1000); + info = client.datanodeReport(DatanodeReportType.LIVE); + } + assertEquals("Number of live nodes should be 0", 0, info.length); + } }