Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 45BA9619D for ; Sat, 30 Jul 2011 00:11:11 +0000 (UTC) Received: (qmail 46699 invoked by uid 500); 30 Jul 2011 00:11:11 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 46647 invoked by uid 500); 30 Jul 2011 00:11:10 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 46633 invoked by uid 99); 30 Jul 2011 00:11:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Jul 2011 00:11:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Jul 2011 00:11:04 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 326F023888E4; Sat, 30 Jul 2011 00:10:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1152401 - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apa... Date: Sat, 30 Jul 2011 00:10:41 -0000 To: hdfs-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110730001042.326F023888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: szetszwo Date: Sat Jul 30 00:10:39 2011 New Revision: 1152401 URL: http://svn.apache.org/viewvc?rev=1152401&view=rev Log: HDFS-2202. Add a new DFSAdmin command to set balancer bandwidth of datanodes without restarting. Contributed by Eric Payne Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BalancerBandwidthCommand.java hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBalancerBandwidth.java Modified: hadoop/common/trunk/hdfs/CHANGES.txt hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Modified: hadoop/common/trunk/hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1152401&r1=1152400&r2=1152401&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hdfs/CHANGES.txt Sat Jul 30 00:10:39 2011 @@ -303,6 +303,9 @@ Trunk (unreleased changes) HDFS-2156. Make hdfs and mapreduce rpm only depend on the same major version for common and hdfs. (eyang via omalley) + HDFS-2202. Add a new DFSAdmin command to set balancer bandwidth of + datanodes without restarting. (Eric Payne via szetszwo) + IMPROVEMENTS HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1152401&r1=1152400&r2=1152401&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Sat Jul 30 00:10:39 2011 @@ -1269,6 +1269,18 @@ public class DFSClient implements FSCons public void metaSave(String pathname) throws IOException { namenode.metaSave(pathname); } + + /** + * Requests the namenode to tell all datanodes to use a new, non-persistent + * bandwidth value for dfs.balance.bandwidthPerSec. + * See {@link ClientProtocol#setBalancerBandwidth(long)} + * for more details. + * + * @see ClientProtocol#setBalancerBandwidth(long) + */ + public void setBalancerBandwidth(long bandwidth) throws IOException { + namenode.setBalancerBandwidth(bandwidth); + } /** * @see ClientProtocol#finalizeUpgrade() Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1152401&r1=1152400&r2=1152401&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Sat Jul 30 00:10:39 2011 @@ -867,4 +867,17 @@ public class DistributedFileSystem exten throws IOException { dfs.cancelDelegationToken(token); } + + /** + * Requests the namenode to tell all datanodes to use a new, non-persistent + * bandwidth value for dfs.balance.bandwidthPerSec. + * The bandwidth parameter is the max number of bytes per second of network + * bandwidth to be used by a datanode during balancing. + * + * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes. + * @throws IOException + */ + public void setBalancerBandwidth(long bandwidth) throws IOException { + dfs.setBalancerBandwidth(bandwidth); + } } Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1152401&r1=1152400&r2=1152401&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Sat Jul 30 00:10:39 2011 @@ -67,9 +67,9 @@ public interface ClientProtocol extends * Compared to the previous version the following changes have been introduced: * (Only the latest change is reflected. * The log of historical changes can be retrieved from the svn). - * 67: Add block pool ID to Block + * 68: Add Balancer Bandwidth Command protocol */ - public static final long versionID = 67L; + public static final long versionID = 68L; /////////////////////////////////////// // File contents @@ -715,6 +715,15 @@ public interface ClientProtocol extends * @throws IOException */ public void metaSave(String filename) throws IOException; + + /** + * Tell all datanodes to use a new, non-persistent bandwidth value for + * dfs.balance.bandwidthPerSec. + * + * @param bandwidth Blanacer bandwidth in bytes per second for this datanode. + * @throws IOException + */ + public void setBalancerBandwidth(long bandwidth) throws IOException; /** * Get the file info for a specific file or directory. Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1152401&r1=1152400&r2=1152401&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Sat Jul 30 00:10:39 2011 @@ -106,6 +106,14 @@ public class DatanodeDescriptor extends public boolean isAlive = false; public boolean needKeyUpdate = false; + // A system administrator can tune the balancer bandwidth parameter + // (dfs.balance.bandwidthPerSec) dynamically by calling + // "dfsadmin -setBalanacerBandwidth ", at which point the + // following 'bandwidth' variable gets updated with the new value for each + // node. Once the heartbeat command is issued to update the value on the + // specified datanode, this value will be set back to 0. + private long bandwidth; + /** A queue of blocks to be replicated by this datanode */ private BlockQueue replicateBlocks = new BlockQueue(); /** A queue of blocks to be recovered by this datanode */ @@ -569,4 +577,20 @@ public class DatanodeDescriptor extends public void updateRegInfo(DatanodeID nodeReg) { super.updateRegInfo(nodeReg); } + + /** + * @return Blanacer bandwidth in bytes per second for this datanode. + */ + public long getBalancerBandwidth() { + return this.bandwidth; + } + + /** + * @param bandwidth Blanacer bandwidth in bytes per second for this datanode. + */ + public void setBalancerBandwidth(long bandwidth) { + this.bandwidth = bandwidth; + } + + } Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1152401&r1=1152400&r2=1152401&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Sat Jul 30 00:10:39 2011 @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; +import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping; @@ -749,7 +750,7 @@ public class DatanodeManager { return new DatanodeCommand[] { brCommand }; } - final List cmds = new ArrayList(3); + final List cmds = new ArrayList(); //check pending replication List pendingList = nodeinfo.getReplicationCommand( maxTransfers); @@ -765,6 +766,14 @@ public class DatanodeManager { } namesystem.addKeyUpdateCommand(cmds, nodeinfo); + + // check for balancer bandwidth update + if (nodeinfo.getBalancerBandwidth() > 0) { + cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth())); + // set back to 0 to indicate that datanode has been sent the new value + nodeinfo.setBalancerBandwidth(0); + } + if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } @@ -773,4 +782,26 @@ public class DatanodeManager { return null; } + + /** + * Tell all datanodes to use a new, non-persistent bandwidth value for + * dfs.balance.bandwidthPerSec. + * + * A system administrator can tune the balancer bandwidth parameter + * (dfs.datanode.balance.bandwidthPerSec) dynamically by calling + * "dfsadmin -setBalanacerBandwidth newbandwidth", at which point the + * following 'bandwidth' variable gets updated with the new value for each + * node. Once the heartbeat command is issued to update the value on the + * specified datanode, this value will be set back to 0. + * + * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes. + * @throws IOException + */ + public void setBalancerBandwidth(long bandwidth) throws IOException { + synchronized(datanodeMap) { + for (DatanodeDescriptor nodeInfo : datanodeMap.values()) { + nodeInfo.setBalancerBandwidth(bandwidth); + } + } + } } Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1152401&r1=1152400&r2=1152401&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Jul 30 00:10:39 2011 @@ -108,6 +108,7 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; +import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtocolSignature; @@ -1340,6 +1341,16 @@ public class DataNode extends Configured ((KeyUpdateCommand) cmd).getExportedKeys()); } break; + case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: + LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE"); + long bandwidth = + ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue(); + if (bandwidth > 0) { + DataXceiverServer dxcs = + (DataXceiverServer) dataXceiverServer.getRunnable(); + dxcs.balanceThrottler.setBandwidth(bandwidth); + } + break; default: LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); } @@ -2774,4 +2785,15 @@ public class DataNode extends Configured return new DatanodeID(getMachineName(), getStorageId(), infoServer.getPort(), getIpcPort()); } + + /** + * Get current value of the max balancer bandwidth in bytes per second. + * + * @return bandwidth Blanacer bandwidth in bytes per second for this datanode. + */ + public Long getBalancerBandwidth() { + DataXceiverServer dxcs = + (DataXceiverServer) this.dataXceiverServer.getRunnable(); + return dxcs.balanceThrottler.getBandwidth(); + } } Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1152401&r1=1152400&r2=1152401&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Jul 30 00:10:39 2011 @@ -5066,5 +5066,13 @@ public class FSNamesystem implements FSC getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList); } - + /** + * Tell all datanodes to use a new, non-persistent bandwidth value for + * dfs.datanode.balance.bandwidthPerSec. + * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes. + * @throws IOException + */ + public void setBalancerBandwidth(long bandwidth) throws IOException { + getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth); + } } Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1152401&r1=1152400&r2=1152401&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Sat Jul 30 00:10:39 2011 @@ -1089,6 +1089,16 @@ public class NameNode implements Namenod } return new CorruptFileBlocks(files, lastCookie); } + + /** + * Tell all datanodes to use a new, non-persistent bandwidth value for + * dfs.datanode.balance.bandwidthPerSec. + * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes. + * @throws IOException + */ + public void setBalancerBandwidth(long bandwidth) throws IOException { + namesystem.setBalancerBandwidth(bandwidth); + } @Override // ClientProtocol public ContentSummary getContentSummary(String path) throws IOException { Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BalancerBandwidthCommand.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BalancerBandwidthCommand.java?rev=1152401&view=auto ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BalancerBandwidthCommand.java (added) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BalancerBandwidthCommand.java Sat Jul 30 00:10:39 2011 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +/* + * A system administrator can tune the balancer bandwidth parameter + * (dfs.balance.bandwidthPerSec) dynamically by calling + * "dfsadmin -setBalanacerBandwidth newbandwidth". + * This class is to define the command which sends the new bandwidth value to + * each datanode. + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * Balancer bandwidth command instructs each datanode to change its value for + * the max amount of network bandwidth it may use during the block balancing + * operation. + * + * The Balancer Bandwidth Command contains the new bandwidth value as its + * payload. The bandwidth value is in bytes per second. + */ +public class BalancerBandwidthCommand extends DatanodeCommand { + private final static long BBC_DEFAULTBANDWIDTH = 0L; + + private long bandwidth; + + /** + * Balancer Bandwidth Command constructor. Sets bandwidth to 0. + */ + BalancerBandwidthCommand() { + this(BBC_DEFAULTBANDWIDTH); + } + + /** + * Balancer Bandwidth Command constructor. + * + * @param bandwidth Blanacer bandwidth in bytes per second. + */ + public BalancerBandwidthCommand(long bandwidth) { + super(DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE); + this.bandwidth = bandwidth; + } + + /** + * Get current value of the max balancer bandwidth in bytes per second. + * + * @return bandwidth Blanacer bandwidth in bytes per second for this datanode. + */ + public long getBalancerBandwidthValue() { + return this.bandwidth; + } + + // /////////////////////////////////////////////// + // Writable + // /////////////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory(BalancerBandwidthCommand.class, new WritableFactory() { + public Writable newInstance() { + return new BalancerBandwidthCommand(); + } + }); + } + + /** + * Writes the bandwidth payload to the Balancer Bandwidth Command packet. + * @param out DataOutput stream used for writing commands to the datanode. + * @throws IOException + */ + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeLong(this.bandwidth); + } + + /** + * Reads the bandwidth payload from the Balancer Bandwidth Command packet. + * @param in DataInput stream used for reading commands to the datanode. + * @throws IOException + */ + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.bandwidth = in.readLong(); + } +} Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1152401&r1=1152400&r2=1152401&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Sat Jul 30 00:10:39 2011 @@ -45,9 +45,9 @@ import org.apache.avro.reflect.Nullable; @InterfaceAudience.Private public interface DatanodeProtocol extends VersionedProtocol { /** - * 27: Add block pool ID to Block + * 28: Add Balancer Bandwidth Command protocol. */ - public static final long versionID = 27L; + public static final long versionID = 28L; // error code final static int NOTIFY = 0; @@ -67,6 +67,7 @@ public interface DatanodeProtocol extend final static int DNA_FINALIZE = 5; // finalize previous upgrade final static int DNA_RECOVERBLOCK = 6; // request a block recovery final static int DNA_ACCESSKEYUPDATE = 7; // update access key + final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth /** * Register Datanode. Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1152401&r1=1152400&r2=1152401&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Sat Jul 30 00:10:39 2011 @@ -452,6 +452,40 @@ public class DFSAdmin extends FsShell { return exitCode; } + /** + * Command to ask the namenode to set the balancer bandwidth for all of the + * datanodes. + * Usage: java DFSAdmin -setBalancerBandwidth bandwidth + * @param argv List of of command line parameters. + * @param idx The index of the command that is being processed. + * @exception IOException + */ + public int setBalancerBandwidth(String[] argv, int idx) throws IOException { + long bandwidth; + int exitCode = -1; + + try { + bandwidth = Long.parseLong(argv[idx]); + } catch (NumberFormatException nfe) { + System.err.println("NumberFormatException: " + nfe.getMessage()); + System.err.println("Usage: java DFSAdmin" + + " [-setBalancerBandwidth ]"); + return exitCode; + } + + FileSystem fs = getFS(); + if (!(fs instanceof DistributedFileSystem)) { + System.err.println("FileSystem is " + fs.getUri()); + return exitCode; + } + + DistributedFileSystem dfs = (DistributedFileSystem) fs; + dfs.setBalancerBandwidth(bandwidth); + exitCode = 0; + + return exitCode; + } + private void printHelp(String cmd) { String summary = "hadoop dfsadmin is the command to execute DFS administrative commands.\n" + "The full syntax is: \n\n" + @@ -469,6 +503,7 @@ public class DFSAdmin extends FsShell { "\t[-printTopology]\n" + "\t[-refreshNamenodes datanodehost:port]\n"+ "\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+ + "\t[-setBalancerBandwidth ]\n" + "\t[-help [cmd]]\n"; String report ="-report: \tReports basic filesystem information and statistics.\n"; @@ -546,6 +581,14 @@ public class DFSAdmin extends FsShell { "\t\t will fail if datanode is still serving the block pool.\n" + "\t\t Refer to refreshNamenodes to shutdown a block pool\n" + "\t\t service on a datanode.\n"; + + String setBalancerBandwidth = "-setBalancerBandwidth :\n" + + "\tChanges the network bandwidth used by each datanode during\n" + + "\tHDFS block balancing.\n\n" + + "\t\t is the maximum number of bytes per second\n" + + "\t\tthat will be used by each datanode. This value overrides\n" + + "\t\tthe dfs.balance.bandwidthPerSec parameter.\n\n" + + "\t\t--- NOTE: The new value is not persistent on the DataNode.---\n"; String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + "\t\tis specified.\n"; @@ -586,6 +629,8 @@ public class DFSAdmin extends FsShell { System.out.println(refreshNamenodes); } else if ("deleteBlockPool".equals(cmd)) { System.out.println(deleteBlockPool); + } else if ("setBalancerBandwidth".equals(cmd)) { + System.out.println(setBalancerBandwidth); } else if ("help".equals(cmd)) { System.out.println(help); } else { @@ -879,6 +924,9 @@ public class DFSAdmin extends FsShell { } else if ("-deleteBlockPool".equals(cmd)) { System.err.println("Usage: java DFSAdmin" + " [-deleteBlockPool datanode-host:port blockpoolId [force]]"); + } else if ("-setBalancerBandwidth".equals(cmd)) { + System.err.println("Usage: java DFSAdmin" + + " [-setBalancerBandwidth ]"); } else { System.err.println("Usage: java DFSAdmin"); System.err.println(" [-report]"); @@ -899,6 +947,7 @@ public class DFSAdmin extends FsShell { System.err.println(" ["+ClearQuotaCommand.USAGE+"]"); System.err.println(" ["+SetSpaceQuotaCommand.USAGE+"]"); System.err.println(" ["+ClearSpaceQuotaCommand.USAGE+"]"); + System.err.println(" [-setBalancerBandwidth ]"); System.err.println(" [-help [cmd]]"); System.err.println(); ToolRunner.printGenericCommandUsage(System.err); @@ -990,6 +1039,11 @@ public class DFSAdmin extends FsShell { printUsage(cmd); return exitCode; } + } else if ("-setBalancerBandwidth".equals(cmd)) { + if (argv.length != 2) { + printUsage(cmd); + return exitCode; + } } // initialize DFSAdmin @@ -1042,6 +1096,8 @@ public class DFSAdmin extends FsShell { exitCode = refreshNamenodes(argv, i); } else if ("-deleteBlockPool".equals(cmd)) { exitCode = deleteBlockPool(argv, i); + } else if ("-setBalancerBandwidth".equals(cmd)) { + exitCode = setBalancerBandwidth(argv, i); } else if ("-help".equals(cmd)) { if (i < argv.length) { printHelp(argv[i]); Added: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBalancerBandwidth.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBalancerBandwidth.java?rev=1152401&view=auto ============================================================================== --- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBalancerBandwidth.java (added) +++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBalancerBandwidth.java Sat Jul 30 00:10:39 2011 @@ -0,0 +1,93 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hdfs; + +import java.util.ArrayList; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.DFSConfigKeys; + +/** + * This test ensures that the balancer bandwidth is dynamically adjusted + * correctly. + */ +public class TestBalancerBandwidth extends TestCase { + final static private Configuration conf = new Configuration(); + final static private int NUM_OF_DATANODES = 2; + final static private int DEFAULT_BANDWIDTH = 1024*1024; + public static final Log LOG = LogFactory.getLog(TestBalancerBandwidth.class); + + public void testBalancerBandwidth() throws Exception { + /* Set bandwidthPerSec to a low value of 1M bps. */ + conf.setLong( + DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, + DEFAULT_BANDWIDTH); + + /* Create and start cluster */ + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES).build(); + try { + cluster.waitActive(); + + DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem(); + + ArrayList datanodes = cluster.getDataNodes(); + // Ensure value from the configuration is reflected in the datanodes. + assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(0).getBalancerBandwidth()); + assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(1).getBalancerBandwidth()); + + // Dynamically change balancer bandwidth and ensure the updated value + // is reflected on the datanodes. + long newBandwidth = 12 * DEFAULT_BANDWIDTH; // 12M bps + fs.setBalancerBandwidth(newBandwidth); + + // Give it a few seconds to propogate new the value to the datanodes. + try { + Thread.sleep(5000); + } catch (Exception e) {} + + assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth()); + assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth()); + + // Dynamically change balancer bandwidth to 0. Balancer bandwidth on the + // datanodes should remain as it was. + fs.setBalancerBandwidth(0); + + // Give it a few seconds to propogate new the value to the datanodes. + try { + Thread.sleep(5000); + } catch (Exception e) {} + + assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth()); + assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth()); + }finally { + cluster.shutdown(); + } + } + + public static void main(String[] args) throws Exception { + new TestBalancerBandwidth().testBalancerBandwidth(); + } +}