Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B53CE2A1F for ; Thu, 5 May 2011 05:40:47 +0000 (UTC) Received: (qmail 75420 invoked by uid 500); 5 May 2011 05:40:47 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 75140 invoked by uid 500); 5 May 2011 05:40:45 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 74605 invoked by uid 99); 5 May 2011 05:40:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 May 2011 05:40:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 May 2011 05:40:37 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 127B723889E7; Thu, 5 May 2011 05:40:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1099687 [3/15] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/pro... Date: Thu, 05 May 2011 05:40:13 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110505054017.127B723889E7@eris.apache.org> Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1099687&r1=1099686&r2=1099687&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu May 5 05:40:07 2011 @@ -24,16 +24,14 @@ import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.io.OutputStream; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.text.DateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Date; -import java.util.EnumSet; import java.util.Formatter; import java.util.HashMap; import java.util.HashSet; @@ -46,45 +44,31 @@ import java.util.concurrent.ExecutionExc import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; -import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.io.retry.RetryProxy; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -187,27 +171,19 @@ import org.apache.hadoop.util.ToolRunner */ @InterfaceAudience.Private -public class Balancer implements Tool { - private static final Log LOG = - LogFactory.getLog(Balancer.class.getName()); +public class Balancer { + static final Log LOG = LogFactory.getLog(Balancer.class); final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB + private static long WIN_WIDTH = 5400*1000L; // 1.5 hour /** The maximum number of concurrent blocks moves for * balancing purpose at a datanode */ public static final int MAX_NUM_CONCURRENT_MOVES = 5; - private Configuration conf; - - private double threshold = 10D; - private NamenodeProtocol namenode; - private ClientProtocol client; - private FileSystem fs; - private boolean isBlockTokenEnabled; - private boolean shouldRun; - private long keyUpdaterInterval; - private BlockTokenSecretManager blockTokenSecretManager; - private Daemon keyupdaterthread = null; // AccessKeyUpdater thread + private final NameNodeConnector nnc; + private final BalancingPolicy policy; + private final double threshold; private final static Random rnd = new Random(); // all data node lists @@ -233,8 +209,6 @@ public class Balancer implements Tool { private NetworkTopology cluster = new NetworkTopology(); - private double avgUtilization = 0.0D; - final static private int MOVER_THREAD_POOL_SIZE = 1000; final private ExecutorService moverExecutor = Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE); @@ -242,6 +216,7 @@ public class Balancer implements Tool { final private ExecutorService dispatcherExecutor = Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE); + /* This class keeps track of a scheduled block move */ private class PendingBlockMove { private BalancerBlock block; @@ -369,14 +344,9 @@ public class Balancer implements Tool { /* Send a block replace request to the output stream*/ private void sendRequest(DataOutputStream out) throws IOException { - Token accessToken = BlockTokenSecretManager.DUMMY_TOKEN; - if (isBlockTokenEnabled) { - accessToken = blockTokenSecretManager.generateToken(null, block - .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE, - BlockTokenSecretManager.AccessMode.COPY)); - } - DataTransferProtocol.Sender.opReplaceBlock(out, - block.getBlock(), source.getStorageID(), + final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); + final Token accessToken = nnc.getAccessToken(eb); + DataTransferProtocol.Sender.opReplaceBlock(out, eb, source.getStorageID(), proxySource.getDatanode(), accessToken); } @@ -487,30 +457,33 @@ public class Balancer implements Tool { } } - /* Return the utilization of a datanode */ - static private double getUtilization(DatanodeInfo datanode) { - return ((double)datanode.getDfsUsed())/datanode.getCapacity()*100; - } /* A class that keeps track of a datanode in Balancer */ private static class BalancerDatanode { final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB - protected DatanodeInfo datanode; - private double utilization; - protected long maxSizeToMove; + final DatanodeInfo datanode; + final double utilization; + final long maxSize2Move; protected long scheduledSize = 0L; // blocks being moved but not confirmed yet private List pendingBlocks = new ArrayList(MAX_NUM_CONCURRENT_MOVES); + @Override + public String toString() { + return getClass().getSimpleName() + "[" + getName() + + ", utilization=" + utilization + "]"; + } + /* Constructor * Depending on avgutil & threshold, calculate maximum bytes to move */ - private BalancerDatanode( - DatanodeInfo node, double avgUtil, double threshold) { + private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) { datanode = node; - utilization = Balancer.getUtilization(node); - + utilization = policy.getUtilization(node); + final double avgUtil = policy.getAvgUtilization(); + long maxSizeToMove; + if (utilization >= avgUtil+threshold || utilization <= avgUtil-threshold) { maxSizeToMove = (long)(threshold*datanode.getCapacity()/100); @@ -521,7 +494,7 @@ public class Balancer implements Tool { if (utilization < avgUtil ) { maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove); } - maxSizeToMove = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); + this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); } /** Get the datanode */ @@ -541,12 +514,12 @@ public class Balancer implements Tool { /** Decide if still need to move more bytes */ protected boolean isMoveQuotaFull() { - return scheduledSize(); /* constructor */ - private Source(DatanodeInfo node, double avgUtil, double threshold) { - super(node, avgUtil, threshold); + private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) { + super(node, policy, threshold); } /** Add a node task */ @@ -626,7 +599,7 @@ public class Balancer implements Tool { * Return the total size of the received blocks in the number of bytes. */ private long getBlockList() throws IOException { - BlockWithLocations[] newBlocks = namenode.getBlocks(datanode, + BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode, Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks(); long bytesReceived = 0; for (BlockWithLocations blk : newBlocks) { @@ -780,160 +753,25 @@ public class Balancer implements Tool { /* Check that this Balancer is compatible with the Block Placement Policy * used by the Namenode. */ - private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException { + private static void checkReplicationPolicyCompatibility(Configuration conf + ) throws UnsupportedActionException { if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != BlockPlacementPolicyDefault.class) { throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault"); } } - - /** Default constructor */ - Balancer() throws UnsupportedActionException { - } - - /** Construct a balancer from the given configuration */ - Balancer(Configuration conf) throws UnsupportedActionException { - checkReplicationPolicyCompatibility(conf); - setConf(conf); - } - - /** Construct a balancer from the given configuration and threshold */ - Balancer(Configuration conf, double threshold) throws UnsupportedActionException { - checkReplicationPolicyCompatibility(conf); - setConf(conf); - this.threshold = threshold; - } /** - * Run a balancer - * @param args - */ - public static void main(String[] args) { - try { - System.exit( ToolRunner.run(null, new Balancer(), args) ); - } catch (Throwable e) { - LOG.error(StringUtils.stringifyException(e)); - System.exit(-1); - } - - } - - private static void printUsage() { - System.out.println("Usage: java Balancer"); - System.out.println(" [-threshold ]\t" - +"percentage of disk capacity"); - } - - /* parse argument to get the threshold */ - private double parseArgs(String[] args) { - double threshold=0; - int argsLen = (args == null) ? 0 : args.length; - if (argsLen==0) { - threshold = 10; - } else { - if (argsLen != 2 || !"-threshold".equalsIgnoreCase(args[0])) { - printUsage(); - throw new IllegalArgumentException(Arrays.toString(args)); - } else { - try { - threshold = Double.parseDouble(args[1]); - if (threshold < 0 || threshold >100) { - throw new NumberFormatException(); - } - LOG.info( "Using a threshold of " + threshold ); - } catch(NumberFormatException e) { - System.err.println( - "Expect a double parameter in the range of [0, 100]: "+ args[1]); - printUsage(); - throw e; - } - } - } - return threshold; - } - - /* Initialize balancer. It sets the value of the threshold, and + * Construct a balancer. + * Initialize balancer. It sets the value of the threshold, and * builds the communication proxies to * namenode as a client and a secondary namenode and retry proxies * when connection fails. */ - private void init(double threshold) throws IOException { - this.threshold = threshold; - this.namenode = createNamenode(conf); - this.client = DFSClient.createNamenode(conf); - this.fs = FileSystem.get(conf); - ExportedBlockKeys keys = namenode.getBlockKeys(); - this.isBlockTokenEnabled = keys.isBlockTokenEnabled(); - if (isBlockTokenEnabled) { - long blockKeyUpdateInterval = keys.getKeyUpdateInterval(); - long blockTokenLifetime = keys.getTokenLifetime(); - LOG.info("Block token params received from NN: keyUpdateInterval=" - + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime=" - + blockTokenLifetime / (60 * 1000) + " min(s)"); - this.blockTokenSecretManager = new BlockTokenSecretManager(false, - blockKeyUpdateInterval, blockTokenLifetime); - this.blockTokenSecretManager.setKeys(keys); - /* - * Balancer should sync its block keys with NN more frequently than NN - * updates its block keys - */ - this.keyUpdaterInterval = blockKeyUpdateInterval / 4; - LOG.info("Balancer will update its block keys every " - + keyUpdaterInterval / (60 * 1000) + " minute(s)"); - this.keyupdaterthread = new Daemon(new BlockKeyUpdater()); - this.shouldRun = true; - this.keyupdaterthread.start(); - } - } - - /** - * Periodically updates access keys. - */ - class BlockKeyUpdater implements Runnable { - - public void run() { - while (shouldRun) { - try { - blockTokenSecretManager.setKeys(namenode.getBlockKeys()); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } - try { - Thread.sleep(keyUpdaterInterval); - } catch (InterruptedException ie) { - } - } - } - } - - /* Build a NamenodeProtocol connection to the namenode and - * set up the retry policy */ - private static NamenodeProtocol createNamenode(Configuration conf) - throws IOException { - InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true); - RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry( - 5, 200, TimeUnit.MILLISECONDS); - Map,RetryPolicy> exceptionToPolicyMap = - new HashMap, RetryPolicy>(); - RetryPolicy methodPolicy = RetryPolicies.retryByException( - timeoutPolicy, exceptionToPolicyMap); - Map methodNameToPolicyMap = - new HashMap(); - methodNameToPolicyMap.put("getBlocks", methodPolicy); - methodNameToPolicyMap.put("getAccessKeys", methodPolicy); - - UserGroupInformation ugi; - ugi = UserGroupInformation.getCurrentUser(); - - return (NamenodeProtocol) RetryProxy.create( - NamenodeProtocol.class, - RPC.getProxy(NamenodeProtocol.class, - NamenodeProtocol.versionID, - nameNodeAddr, - ugi, - conf, - NetUtils.getDefaultSocketFactory(conf)), - methodNameToPolicyMap); + Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { + this.threshold = p.threshold; + this.policy = p.policy; + this.nnc = theblockpool; } /* Shuffle datanode array */ @@ -946,13 +784,6 @@ public class Balancer implements Tool { } } - /* get all live datanodes of a cluster and their disk usage - * decide the number of bytes need to be moved - */ - private long initNodes() throws IOException { - return initNodes(client.getDatanodeReport(DatanodeReportType.LIVE)); - } - /* Given a data node set, build a network topology and decide * over-utilized datanodes, above average utilized datanodes, * below average utilized datanodes, and underutilized datanodes. @@ -968,15 +799,13 @@ public class Balancer implements Tool { */ private long initNodes(DatanodeInfo[] datanodes) { // compute average utilization - long totalCapacity=0L, totalUsedSpace=0L; for (DatanodeInfo datanode : datanodes) { if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { continue; // ignore decommissioning or decommissioned nodes } - totalCapacity += datanode.getCapacity(); - totalUsedSpace += datanode.getDfsUsed(); + policy.accumulateSpaces(datanode); } - this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100; + policy.initAvgUtilization(); /*create network topology and all data node lists: * overloaded, above-average, below-average, and underloaded @@ -991,19 +820,20 @@ public class Balancer implements Tool { } cluster.add(datanode); BalancerDatanode datanodeS; - if (getUtilization(datanode) > avgUtilization) { - datanodeS = new Source(datanode, avgUtilization, threshold); + final double avg = policy.getAvgUtilization(); + if (policy.getUtilization(datanode) > avg) { + datanodeS = new Source(datanode, policy, threshold); if (isAboveAvgUtilized(datanodeS)) { this.aboveAvgUtilizedDatanodes.add((Source)datanodeS); } else { assert(isOverUtilized(datanodeS)) : datanodeS.getName()+ "is not an overUtilized node"; this.overUtilizedDatanodes.add((Source)datanodeS); - overLoadedBytes += (long)((datanodeS.utilization-avgUtilization + overLoadedBytes += (long)((datanodeS.utilization-avg -threshold)*datanodeS.datanode.getCapacity()/100.0); } } else { - datanodeS = new BalancerDatanode(datanode, avgUtilization, threshold); + datanodeS = new BalancerDatanode(datanode, policy, threshold); if ( isBelowOrEqualAvgUtilized(datanodeS)) { this.belowAvgUtilizedDatanodes.add(datanodeS); } else { @@ -1011,7 +841,7 @@ public class Balancer implements Tool { + datanodeS.getName() + ")=" + isUnderUtilized(datanodeS) + ", utilization=" + datanodeS.utilization; this.underUtilizedDatanodes.add(datanodeS); - underLoadedBytes += (long)((avgUtilization-threshold- + underLoadedBytes += (long)((avg-threshold- datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0); } } @@ -1019,7 +849,7 @@ public class Balancer implements Tool { } //logging - logImbalancedNodes(); + logNodes(); assert (this.datanodes.size() == overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+ @@ -1031,25 +861,20 @@ public class Balancer implements Tool { } /* log the over utilized & under utilized nodes */ - private void logImbalancedNodes() { - StringBuilder msg = new StringBuilder(); - msg.append(overUtilizedDatanodes.size()); - msg.append(" over utilized nodes:"); - for (Source node : overUtilizedDatanodes) { - msg.append( " " ); - msg.append( node.getName() ); - } - LOG.info(msg); - msg = new StringBuilder(); - msg.append(underUtilizedDatanodes.size()); - msg.append(" under utilized nodes: "); - for (BalancerDatanode node : underUtilizedDatanodes) { - msg.append( " " ); - msg.append( node.getName() ); - } - LOG.info(msg); + private void logNodes() { + logNodes("over-utilized", overUtilizedDatanodes); + if (LOG.isTraceEnabled()) { + logNodes("above-average", aboveAvgUtilizedDatanodes); + logNodes("below-average", belowAvgUtilizedDatanodes); + } + logNodes("underutilized", underUtilizedDatanodes); } - + + private static void logNodes( + String name, Collection nodes) { + LOG.info(nodes.size() + " " + name + ": " + nodes); + } + /* Decide all pairs and * the number of bytes to move from a source to a target * Maximum bytes to be moved per node is @@ -1313,7 +1138,6 @@ public class Balancer implements Tool { */ private static class MovedBlocks { private long lastCleanupTime = System.currentTimeMillis(); - private static long winWidth = 5400*1000L; // 1.5 hour final private static int CUR_WIN = 0; final private static int OLD_WIN = 1; final private static int NUM_WINS = 2; @@ -1326,13 +1150,6 @@ public class Balancer implements Tool { movedBlocks.add(new HashMap()); } - /* set the win width */ - private void setWinWidth(Configuration conf) { - winWidth = conf.getLong( - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); - } - /* add a block thus marking a block to be moved */ synchronized private void add(BalancerBlock block) { movedBlocks.get(CUR_WIN).put(block.getBlock(), block); @@ -1353,7 +1170,7 @@ public class Balancer implements Tool { synchronized private void cleanup() { long curTime = System.currentTimeMillis(); // check if old win is older than winWidth - if (lastCleanupTime + winWidth <= curTime) { + if (lastCleanupTime + WIN_WIDTH <= curTime) { // purge the old window movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN)); movedBlocks.set(CUR_WIN, new HashMap()); @@ -1419,7 +1236,7 @@ public class Balancer implements Tool { this.datanodes.clear(); this.sources.clear(); this.targets.clear(); - this.avgUtilization = 0.0D; + this.policy.reset(); cleanGlobalBlockList(); this.movedBlocks.cleanup(); } @@ -1439,182 +1256,172 @@ public class Balancer implements Tool { /* Return true if the given datanode is overUtilized */ private boolean isOverUtilized(BalancerDatanode datanode) { - return datanode.utilization > (avgUtilization+threshold); + return datanode.utilization > (policy.getAvgUtilization()+threshold); } /* Return true if the given datanode is above average utilized * but not overUtilized */ private boolean isAboveAvgUtilized(BalancerDatanode datanode) { - return (datanode.utilization <= (avgUtilization+threshold)) - && (datanode.utilization > avgUtilization); + final double avg = policy.getAvgUtilization(); + return (datanode.utilization <= (avg+threshold)) + && (datanode.utilization > avg); } /* Return true if the given datanode is underUtilized */ private boolean isUnderUtilized(BalancerDatanode datanode) { - return datanode.utilization < (avgUtilization-threshold); + return datanode.utilization < (policy.getAvgUtilization()-threshold); } /* Return true if the given datanode is below average utilized * but not underUtilized */ private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) { - return (datanode.utilization >= (avgUtilization-threshold)) - && (datanode.utilization <= avgUtilization); + final double avg = policy.getAvgUtilization(); + return (datanode.utilization >= (avg-threshold)) + && (datanode.utilization <= avg); } // Exit status - final public static int SUCCESS = 1; - final public static int ALREADY_RUNNING = -1; - final public static int NO_MOVE_BLOCK = -2; - final public static int NO_MOVE_PROGRESS = -3; - final public static int IO_EXCEPTION = -4; - final public static int ILLEGAL_ARGS = -5; - /** main method of Balancer - * @param args arguments to a Balancer - * @throws Exception exception that occured during datanode balancing - */ - public int run(String[] args) throws Exception { - long startTime = Util.now(); - OutputStream out = null; + enum ReturnStatus { + SUCCESS(1), + IN_PROGRESS(0), + ALREADY_RUNNING(-1), + NO_MOVE_BLOCK(-2), + NO_MOVE_PROGRESS(-3), + IO_EXCEPTION(-4), + ILLEGAL_ARGS(-5), + INTERRUPTED(-6); + + final int code; + + ReturnStatus(int code) { + this.code = code; + } + } + + /** Run an iteration for all datanodes. */ + private ReturnStatus run(int iteration, Formatter formatter) { try { - // initialize a balancer - init(parseArgs(args)); + /* get all live datanodes of a cluster and their disk usage + * decide the number of bytes need to be moved + */ + final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE)); + if (bytesLeftToMove == 0) { + System.out.println("The cluster is balanced. Exiting..."); + return ReturnStatus.SUCCESS; + } else { + LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) + + " to make the cluster balanced." ); + } - /* Check if there is another balancer running. - * Exit if there is another one running. + /* Decide all the nodes that will participate in the block move and + * the number of bytes that need to be moved from one node to another + * in this iteration. Maximum bytes to be moved per node is + * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). */ - out = checkAndMarkRunningBalancer(); - if (out == null) { - System.out.println("Another balancer is running. Exiting..."); - return ALREADY_RUNNING; + final long bytesToMove = chooseNodes(); + if (bytesToMove == 0) { + System.out.println("No block can be moved. Exiting..."); + return ReturnStatus.NO_MOVE_BLOCK; + } else { + LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) + + " in this iteration"); } - Formatter formatter = new Formatter(System.out); - System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); - int iterations = 0; - while (true ) { - /* get all live datanodes of a cluster and their disk usage - * decide the number of bytes need to be moved - */ - long bytesLeftToMove = initNodes(); - if (bytesLeftToMove == 0) { - System.out.println("The cluster is balanced. Exiting..."); - return SUCCESS; - } else { - LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) - +" bytes to make the cluster balanced." ); - } - - /* Decide all the nodes that will participate in the block move and - * the number of bytes that need to be moved from one node to another - * in this iteration. Maximum bytes to be moved per node is - * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). - */ - long bytesToMove = chooseNodes(); - if (bytesToMove == 0) { - System.out.println("No block can be moved. Exiting..."); - return NO_MOVE_BLOCK; - } else { - LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) + - "bytes in this iteration"); - } - - formatter.format("%-24s %10d %19s %18s %17s\n", - DateFormat.getDateTimeInstance().format(new Date()), - iterations, - StringUtils.byteDesc(bytesMoved.get()), - StringUtils.byteDesc(bytesLeftToMove), - StringUtils.byteDesc(bytesToMove) - ); - - /* For each pair of , start a thread that repeatedly - * decide a block to be moved and its proxy source, - * then initiates the move until all bytes are moved or no more block - * available to move. - * Exit no byte has been moved for 5 consecutive iterations. - */ - if (dispatchBlockMoves() > 0) { - notChangedIterations = 0; - } else { - notChangedIterations++; - if (notChangedIterations >= 5) { - System.out.println( - "No block has been moved for 5 iterations. Exiting..."); - return NO_MOVE_PROGRESS; - } - } - - // clean all lists - resetData(); - - try { - Thread.sleep(2000*conf.getLong( - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT)); - } catch (InterruptedException ignored) { + formatter.format("%-24s %10d %19s %18s %17s\n", + DateFormat.getDateTimeInstance().format(new Date()), + iteration, + StringUtils.byteDesc(bytesMoved.get()), + StringUtils.byteDesc(bytesLeftToMove), + StringUtils.byteDesc(bytesToMove) + ); + + /* For each pair of , start a thread that repeatedly + * decide a block to be moved and its proxy source, + * then initiates the move until all bytes are moved or no more block + * available to move. + * Exit no byte has been moved for 5 consecutive iterations. + */ + if (dispatchBlockMoves() > 0) { + notChangedIterations = 0; + } else { + notChangedIterations++; + if (notChangedIterations >= 5) { + System.out.println( + "No block has been moved for 5 iterations. Exiting..."); + return ReturnStatus.NO_MOVE_PROGRESS; } - - iterations++; } - } catch (IllegalArgumentException ae) { - return ILLEGAL_ARGS; + + // clean all lists + resetData(); + return ReturnStatus.IN_PROGRESS; + } catch (IllegalArgumentException e) { + System.out.println(e + ". Exiting ..."); + return ReturnStatus.ILLEGAL_ARGS; } catch (IOException e) { - System.out.println("Received an IO exception: " + e.getMessage() + - " . Exiting..."); - return IO_EXCEPTION; + System.out.println(e + ". Exiting ..."); + return ReturnStatus.IO_EXCEPTION; + } catch (InterruptedException e) { + System.out.println(e + ". Exiting ..."); + return ReturnStatus.INTERRUPTED; } finally { // shutdown thread pools dispatcherExecutor.shutdownNow(); moverExecutor.shutdownNow(); - - shouldRun = false; - try { - if (keyupdaterthread != null) keyupdaterthread.interrupt(); - } catch (Exception e) { - LOG.warn("Exception shutting down access key updater thread", e); - } - // close the output file - IOUtils.closeStream(out); - if (fs != null) { - try { - fs.delete(BALANCER_ID_PATH, true); - } catch(IOException ignored) { - } - } - System.out.println("Balancing took " + - time2Str(Util.now()-startTime)); } } - private Path BALANCER_ID_PATH = new Path("/system/balancer.id"); - /* The idea for making sure that there is no more than one balancer - * running in an HDFS is to create a file in the HDFS, writes the IP address - * of the machine on which the balancer is running to the file, but did not - * close the file until the balancer exits. - * This prevents the second balancer from running because it can not - * creates the file while the first one is running. - * - * This method checks if there is any running balancer and - * if no, mark yes if no. - * Note that this is an atomic operation. - * - * Return null if there is a running balancer; otherwise the output stream - * to the newly created file. + /** + * Balance all namenodes. + * For each iteration, + * for each namenode, + * execute a {@link Balancer} to work through all datanodes once. */ - private OutputStream checkAndMarkRunningBalancer() throws IOException { + static int run(List namenodes, final Parameters p, + Configuration conf) throws IOException, InterruptedException { + final long sleeptime = 2000*conf.getLong( + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); + LOG.info("namenodes = " + namenodes); + LOG.info("p = " + p); + + final Formatter formatter = new Formatter(System.out); + System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); + + final List connectors + = new ArrayList(namenodes.size()); try { - DataOutputStream out = fs.create(BALANCER_ID_PATH); - out. writeBytes(InetAddress.getLocalHost().getHostName()); - out.flush(); - return out; - } catch(RemoteException e) { - if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ - return null; - } else { - throw e; + for(InetSocketAddress isa : namenodes) { + connectors.add(new NameNodeConnector(isa, conf)); + } + + boolean done = false; + for(int iteration = 0; !done; iteration++) { + done = true; + Collections.shuffle(connectors); + for(NameNodeConnector nnc : connectors) { + final Balancer b = new Balancer(nnc, p, conf); + final ReturnStatus r = b.run(iteration, formatter); + if (r == ReturnStatus.IN_PROGRESS) { + done = false; + } else if (r != ReturnStatus.SUCCESS) { + //must be an error statue, return. + return r.code; + } + } + + if (!done) { + Thread.sleep(sleeptime); + } + } + } finally { + for(NameNodeConnector nnc : connectors) { + nnc.close(); } } + return ReturnStatus.SUCCESS.code; } - + /* Given elaspedTime in ms, return a printable string */ private static String time2Str(long elapsedTime) { String unit; @@ -1635,15 +1442,116 @@ public class Balancer implements Tool { return time+" "+unit; } - /** return this balancer's configuration */ - public Configuration getConf() { - return conf; + static class Parameters { + static final Parameters DEFALUT = new Parameters( + BalancingPolicy.Node.INSTANCE, 10.0); + + final BalancingPolicy policy; + final double threshold; + + Parameters(BalancingPolicy policy, double threshold) { + this.policy = policy; + this.threshold = threshold; + } + + @Override + public String toString() { + return Balancer.class.getSimpleName() + "." + getClass().getSimpleName() + + "[" + policy + ", threshold=" + threshold + "]"; + } } - /** set this balancer's configuration */ - public void setConf(Configuration conf) { - this.conf = conf; - movedBlocks.setWinWidth(conf); + static class Cli extends Configured implements Tool { + /** Parse arguments and then run Balancer */ + @Override + public int run(String[] args) { + final long startTime = Util.now(); + final Configuration conf = getConf(); + WIN_WIDTH = conf.getLong( + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); + + try { + checkReplicationPolicyCompatibility(conf); + + final List namenodes = DFSUtil.getNNServiceRpcAddresses(conf); + return Balancer.run(namenodes, parse(args), conf); + } catch (IOException e) { + System.out.println(e + ". Exiting ..."); + return ReturnStatus.IO_EXCEPTION.code; + } catch (InterruptedException e) { + System.out.println(e + ". Exiting ..."); + return ReturnStatus.INTERRUPTED.code; + } finally { + System.out.println("Balancing took " + time2Str(Util.now()-startTime)); + } + } + + /** parse command line arguments */ + static Parameters parse(String[] args) { + BalancingPolicy policy = Parameters.DEFALUT.policy; + double threshold = Parameters.DEFALUT.threshold; + + if (args != null) { + try { + for(int i = 0; i < args.length; i++) { + if ("-threshold".equalsIgnoreCase(args[i])) { + i++; + try { + threshold = Double.parseDouble(args[i]); + if (threshold < 0 || threshold > 100) { + throw new NumberFormatException( + "Number out of range: threshold = " + threshold); + } + LOG.info( "Using a threshold of " + threshold ); + } catch(NumberFormatException e) { + System.err.println( + "Expecting a number in the range of [0.0, 100.0]: " + + args[i]); + throw e; + } + } else if ("-policy".equalsIgnoreCase(args[i])) { + i++; + try { + policy = BalancingPolicy.parse(args[i]); + } catch(IllegalArgumentException e) { + System.err.println("Illegal policy name: " + args[i]); + throw e; + } + } else { + throw new IllegalArgumentException("args = " + + Arrays.toString(args)); + } + } + } catch(RuntimeException e) { + printUsage(); + throw e; + } + } + + return new Parameters(policy, threshold); + } + + private static void printUsage() { + System.out.println("Usage: java " + Balancer.class.getSimpleName()); + System.out.println(" [-policy ]\tthe balancing policy: " + + BalancingPolicy.Node.INSTANCE.getName() + " or " + + BalancingPolicy.Pool.INSTANCE.getName()); + System.out.println( + " [-threshold ]\tPercentage of disk capacity"); + } } + /** + * Run a balancer + * @param args Command line arguments + */ + public static void main(String[] args) { + try { + System.exit(ToolRunner.run(null, new Cli(), args)); + } catch (Throwable e) { + LOG.error(StringUtils.stringifyException(e)); + System.exit(-1); + } + } } Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=1099687&r1=1099686&r2=1099687&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original) +++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Thu May 5 05:40:07 2011 @@ -41,6 +41,8 @@ public interface HdfsConstants { /** Startup options */ static public enum StartupOption{ FORMAT ("-format"), + CLUSTERID ("-clusterid"), + GENCLUSTERID ("-genclusterid"), REGULAR ("-regular"), BACKUP ("-backup"), CHECKPOINT("-checkpoint"), @@ -50,6 +52,10 @@ public interface HdfsConstants { IMPORT ("-importCheckpoint"); private String name = null; + + // Used only with format and upgrade options + private String clusterId = null; + private StartupOption(String arg) {this.name = arg;} public String getName() {return name;} public NamenodeRole toNodeRole() { @@ -62,7 +68,14 @@ public interface HdfsConstants { return NamenodeRole.ACTIVE; } } - + + public void setClusterId(String cid) { + clusterId = cid; + } + + public String getClusterId() { + return clusterId; + } } // Timeouts for communicating with DataNode for streaming writes/reads Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1099687&r1=1099686&r2=1099687&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original) +++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu May 5 05:40:07 2011 @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.Random; import java.util.TreeSet; +import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; import javax.servlet.jsp.JspWriter; @@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.BlockReade import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -66,6 +68,7 @@ public class JspHelper { public static final String CURRENT_CONF = "current.conf"; final static public String WEB_UGI_PROPERTY_NAME = "dfs.web.ugi"; public static final String DELEGATION_PARAMETER_NAME = "delegation"; + public static final String NAMENODE_ADDRESS = "nnaddr"; static final String SET_DELEGATION = "&" + DELEGATION_PARAMETER_NAME + "="; private static final Log LOG = LogFactory.getLog(JspHelper.class); @@ -181,7 +184,7 @@ public class JspHelper { return chosenNode; } - public static void streamBlockInAscii(InetSocketAddress addr, + public static void streamBlockInAscii(InetSocketAddress addr, String poolId, long blockId, Token blockToken, long genStamp, long blockSize, long offsetIntoBlock, long chunkSizeToView, JspWriter out, Configuration conf) throws IOException { @@ -193,9 +196,9 @@ public class JspHelper { long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock); // Use the block name for file name. - String file = BlockReader.getFileName(addr, blockId); + String file = BlockReader.getFileName(addr, poolId, blockId); BlockReader blockReader = BlockReader.newBlockReader(s, file, - new Block(blockId, 0, genStamp), blockToken, + new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken, offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096)); byte[] buf = new byte[(int)amtToRead]; @@ -359,14 +362,16 @@ public class JspHelper { public static void printPathWithLinks(String dir, JspWriter out, int namenodeInfoPort, - String tokenString + String tokenString, + String nnAddress ) throws IOException { try { String[] parts = dir.split(Path.SEPARATOR); StringBuilder tempPath = new StringBuilder(dir.length()); out.print("" + Path.SEPARATOR + + getDelegationTokenUrlParam(tokenString) + + getUrlParam(NAMENODE_ADDRESS, nnAddress) + "\">" + Path.SEPARATOR + ""); tempPath.append(Path.SEPARATOR); for (int i = 0; i < parts.length-1; i++) { @@ -374,7 +379,8 @@ public class JspHelper { tempPath.append(parts[i]); out.print("" + parts[i] + "" + Path.SEPARATOR); tempPath.append(Path.SEPARATOR); } @@ -391,7 +397,8 @@ public class JspHelper { public static void printGotoForm(JspWriter out, int namenodeInfoPort, String tokenString, - String file) throws IOException { + String file, + String nnAddress) throws IOException { out.print("
"); out.print("Goto : "); out.print(""); @@ -402,6 +409,8 @@ public class JspHelper { out.print(""); } + out.print(""); out.print("
"); } @@ -475,16 +484,43 @@ public class JspHelper { return UserGroupInformation.createRemoteUser(strings[0]); } + private static String getNNServiceAddress(ServletContext context, + HttpServletRequest request) { + String namenodeAddressInUrl = request.getParameter(NAMENODE_ADDRESS); + InetSocketAddress namenodeAddress = null; + if (namenodeAddressInUrl != null) { + namenodeAddress = DFSUtil.getSocketAddress(namenodeAddressInUrl); + } else if (context != null) { + namenodeAddress = (InetSocketAddress) context + .getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY); + } + if (namenodeAddress != null) { + return (namenodeAddress.getAddress().getHostAddress() + ":" + + namenodeAddress.getPort()); + } + return null; + } + + /** + * See + * {@link JspHelper#getUGI(ServletContext, HttpServletRequest, Configuration)} + * , ServletContext is passed as null. + */ + public static UserGroupInformation getUGI(HttpServletRequest request, + Configuration conf) throws IOException { + return getUGI(null, request, conf); + } + /** * Get {@link UserGroupInformation} and possibly the delegation token out of * the request. + * @param context the ServletContext that is serving this request. * @param request the http request * @return a new user from the request * @throws AccessControlException if the request has no token */ - public static UserGroupInformation getUGI(HttpServletRequest request, - Configuration conf - ) throws IOException { + public static UserGroupInformation getUGI(ServletContext context, + HttpServletRequest request, Configuration conf) throws IOException { UserGroupInformation ugi = null; if(UserGroupInformation.isSecurityEnabled()) { String user = request.getRemoteUser(); @@ -493,12 +529,12 @@ public class JspHelper { Token token = new Token(); token.decodeFromUrlString(tokenString); - InetSocketAddress serviceAddr = NameNode.getAddress(conf); - LOG.info("Setting service in token: " - + new Text(serviceAddr.getAddress().getHostAddress() + ":" - + serviceAddr.getPort())); - token.setService(new Text(serviceAddr.getAddress().getHostAddress() - + ":" + serviceAddr.getPort())); + String serviceAddress = getNNServiceAddress(context, request); + if (serviceAddress != null) { + LOG.info("Setting service in token: " + + new Text(serviceAddress)); + token.setService(new Text(serviceAddress)); + } ByteArrayInputStream buf = new ByteArrayInputStream(token .getIdentifier()); DataInputStream in = new DataInputStream(buf); @@ -549,5 +585,13 @@ public class JspHelper { } } - + /** + * Returns the url parameter for the given bpid string. + * @param name parameter name + * @param val parameter value + * @return url parameter + */ + public static String getUrlParam(String name, String val) { + return val == null ? "" : "&" + name + "=" + val; + } } Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1099687&r1=1099686&r2=1099687&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original) +++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Thu May 5 05:40:07 2011 @@ -78,18 +78,21 @@ public abstract class Storage extends St // last layout version that did not support persistent rbw replicas public static final int PRE_RBW_LAYOUT_VERSION = -19; + // last layout version that is before federation + public static final int LAST_PRE_FEDERATION_LAYOUT_VERSION = -30; + /** Layout versions of 0.20.203 release */ public static final int[] LAYOUT_VERSIONS_203 = {-19, -31}; private static final String STORAGE_FILE_LOCK = "in_use.lock"; protected static final String STORAGE_FILE_VERSION = "VERSION"; public static final String STORAGE_DIR_CURRENT = "current"; - private static final String STORAGE_DIR_PREVIOUS = "previous"; - private static final String STORAGE_TMP_REMOVED = "removed.tmp"; - private static final String STORAGE_TMP_PREVIOUS = "previous.tmp"; - private static final String STORAGE_TMP_FINALIZED = "finalized.tmp"; - private static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp"; - private static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint"; + public static final String STORAGE_DIR_PREVIOUS = "previous"; + public static final String STORAGE_TMP_REMOVED = "removed.tmp"; + public static final String STORAGE_TMP_PREVIOUS = "previous.tmp"; + public static final String STORAGE_TMP_FINALIZED = "finalized.tmp"; + public static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp"; + public static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint"; public enum StorageState { NON_EXISTENT, @@ -115,7 +118,7 @@ public abstract class Storage extends St public boolean isOfType(StorageDirType type); } - private NodeType storageType; // Type of the node using this storage + protected NodeType storageType; // Type of the node using this storage protected List storageDirs = new ArrayList(); private class DirIterator implements Iterator { @@ -208,19 +211,32 @@ public abstract class Storage extends St */ @InterfaceAudience.Private public class StorageDirectory { - File root; // root directory - FileLock lock; // storage lock - StorageDirType dirType; // storage dir type + final File root; // root directory + final boolean useLock; // flag to enable storage lock + final StorageDirType dirType; // storage dir type + FileLock lock; // storage lock public StorageDirectory(File dir) { // default dirType is null - this(dir, null); + this(dir, null, true); } public StorageDirectory(File dir, StorageDirType dirType) { + this(dir, dirType, true); + } + + /** + * Constructor + * @param dir directory corresponding to the storage + * @param dirType storage directory type + * @param useLock true - enables locking on the storage directory and false + * disables locking + */ + public StorageDirectory(File dir, StorageDirType dirType, boolean useLock) { this.root = dir; this.lock = null; this.dirType = dirType; + this.useLock = useLock; } /** @@ -245,22 +261,26 @@ public abstract class Storage extends St public void read() throws IOException { read(getVersionFile()); } - public void read(File from) throws IOException { + Properties props = readFrom(from); + getFields(props, this); + } + + public Properties readFrom(File from) throws IOException { RandomAccessFile file = new RandomAccessFile(from, "rws"); FileInputStream in = null; + Properties props = new Properties(); try { in = new FileInputStream(file.getFD()); file.seek(0); - Properties props = new Properties(); props.load(in); - getFields(props, this); } finally { if (in != null) { in.close(); } file.close(); } + return props; } /** @@ -620,6 +640,10 @@ public abstract class Storage extends St * @throws IOException if locking fails */ public void lock() throws IOException { + if (!useLock) { + LOG.info("Locking is disabled"); + return; + } this.lock = tryLock(); if (lock == null) { String msg = "Cannot lock storage " + this.root @@ -676,11 +700,6 @@ public abstract class Storage extends St this.storageType = type; } - protected Storage(NodeType type, int nsID, long cT) { - super(FSConstants.LAYOUT_VERSION, nsID, cT); - this.storageType = type; - } - protected Storage(NodeType type, StorageInfo storageInfo) { super(storageInfo); this.storageType = type; @@ -726,8 +745,9 @@ public abstract class Storage extends St public static void checkVersionUpgradable(int oldVersion) throws IOException { if (oldVersion > LAST_UPGRADABLE_LAYOUT_VERSION) { - String msg = "*********** Upgrade is not supported from this older" + - " version of storage to the current version." + + String msg = "*********** Upgrade is not supported from this " + + " older version " + oldVersion + + " of storage to the current version." + " Please upgrade to " + LAST_UPGRADABLE_HADOOP_VERSION + " or a later version and then upgrade to current" + " version. Old layout version is " + @@ -751,29 +771,11 @@ public abstract class Storage extends St protected void getFields(Properties props, StorageDirectory sd ) throws IOException { - String sv, st, sid, sct; - sv = props.getProperty("layoutVersion"); - st = props.getProperty("storageType"); - sid = props.getProperty("namespaceID"); - sct = props.getProperty("cTime"); - if (sv == null || st == null || sid == null || sct == null) - throw new InconsistentFSStateException(sd.root, - "file " + STORAGE_FILE_VERSION + " is invalid."); - int rv = Integer.parseInt(sv); - NodeType rt = NodeType.valueOf(st); - int rid = Integer.parseInt(sid); - long rct = Long.parseLong(sct); - if (!storageType.equals(rt) || - !((namespaceID == 0) || (rid == 0) || namespaceID == rid)) - throw new InconsistentFSStateException(sd.root, - "is incompatible with others."); - if (rv < FSConstants.LAYOUT_VERSION) // future version - throw new IncorrectVersionException(rv, "storage directory " - + sd.root.getCanonicalPath()); - layoutVersion = rv; - storageType = rt; - namespaceID = rid; - cTime = rct; + setLayoutVersion(props, sd); + setNamespaceID(props, sd); + setStorageType(props, sd); + setcTime(props, sd); + setClusterId(props, layoutVersion, sd); } /** @@ -789,6 +791,10 @@ public abstract class Storage extends St props.setProperty("layoutVersion", String.valueOf(layoutVersion)); props.setProperty("storageType", storageType.toString()); props.setProperty("namespaceID", String.valueOf(namespaceID)); + // Set clusterID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before + if (layoutVersion < LAST_PRE_FEDERATION_LAYOUT_VERSION) { + props.setProperty("clusterID", clusterID); + } props.setProperty("cTime", String.valueOf(cTime)); } @@ -871,10 +877,74 @@ public abstract class Storage extends St public static String getRegistrationID(StorageInfo storage) { return "NS-" + Integer.toString(storage.getNamespaceID()) + + "-" + storage.getClusterID() + "-" + Integer.toString(storage.getLayoutVersion()) + "-" + Long.toString(storage.getCTime()); } + String getProperty(Properties props, StorageDirectory sd, + String name) throws InconsistentFSStateException { + String property = props.getProperty(name); + if (property == null) { + throw new InconsistentFSStateException(sd.root, "file " + + STORAGE_FILE_VERSION + " has " + name + " mising."); + } + return property; + } + + /** Validate and set storage type from {@link Properties}*/ + protected void setStorageType(Properties props, StorageDirectory sd) + throws InconsistentFSStateException { + NodeType type = NodeType.valueOf(getProperty(props, sd, "storageType")); + if (!storageType.equals(type)) { + throw new InconsistentFSStateException(sd.root, + "node type is incompatible with others."); + } + storageType = type; + } + + /** Validate and set ctime from {@link Properties}*/ + protected void setcTime(Properties props, StorageDirectory sd) + throws InconsistentFSStateException { + cTime = Long.parseLong(getProperty(props, sd, "cTime")); + } + + /** Validate and set clusterId from {@link Properties}*/ + protected void setClusterId(Properties props, int layoutVersion, + StorageDirectory sd) throws InconsistentFSStateException { + // No Cluster ID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before + if (layoutVersion < Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION) { + String cid = getProperty(props, sd, "clusterID"); + if (!(clusterID.equals("") || cid.equals("") || clusterID.equals(cid))) { + throw new InconsistentFSStateException(sd.getRoot(), + "cluster Id is incompatible with others."); + } + clusterID = cid; + } + } + + /** Validate and set layout version from {@link Properties}*/ + protected void setLayoutVersion(Properties props, StorageDirectory sd) + throws IncorrectVersionException, InconsistentFSStateException { + int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion")); + if (lv < FSConstants.LAYOUT_VERSION) { // future version + throw new IncorrectVersionException(lv, "storage directory " + + sd.root.getAbsolutePath()); + } + layoutVersion = lv; + } + + /** Validate and set namespaceID version from {@link Properties}*/ + protected void setNamespaceID(Properties props, StorageDirectory sd) + throws InconsistentFSStateException { + int nsId = Integer.parseInt(getProperty(props, sd, "namespaceID")); + if (namespaceID != 0 && nsId != 0 && namespaceID != nsId) { + throw new InconsistentFSStateException(sd.root, + "namespaceID is incompatible with others."); + } + namespaceID = nsId; + } + public static boolean is203LayoutVersion(int layoutVersion) { for (int lv203 : LAYOUT_VERSIONS_203) { if (lv203 == layoutVersion) { Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=1099687&r1=1099686&r2=1099687&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original) +++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java Thu May 5 05:40:07 2011 @@ -23,7 +23,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Writable; - +import org.apache.hadoop.io.WritableUtils; /** * Common class for storage information. @@ -34,14 +34,16 @@ import org.apache.hadoop.io.Writable; public class StorageInfo implements Writable { public int layoutVersion; // layout version of the storage data public int namespaceID; // id of the file system + public String clusterID; // id of the cluster public long cTime; // creation time of the file system state public StorageInfo () { - this(0, 0, 0L); + this(0, 0, "", 0L); } - public StorageInfo(int layoutV, int nsID, long cT) { + public StorageInfo(int layoutV, int nsID, String cid, long cT) { layoutVersion = layoutV; + clusterID = cid; namespaceID = nsID; cTime = cT; } @@ -63,13 +65,19 @@ public class StorageInfo implements Writ public int getNamespaceID() { return namespaceID; } /** + * cluster id of the file system.

+ */ + public String getClusterID() { return clusterID; } + + /** * Creation time of the file system state.

* Modified during upgrades. */ public long getCTime() { return cTime; } - + public void setStorageInfo(StorageInfo from) { layoutVersion = from.layoutVersion; + clusterID = from.clusterID; namespaceID = from.namespaceID; cTime = from.cTime; } @@ -80,12 +88,21 @@ public class StorageInfo implements Writ public void write(DataOutput out) throws IOException { out.writeInt(getLayoutVersion()); out.writeInt(getNamespaceID()); + WritableUtils.writeString(out, clusterID); out.writeLong(getCTime()); } public void readFields(DataInput in) throws IOException { layoutVersion = in.readInt(); namespaceID = in.readInt(); + clusterID = WritableUtils.readString(in); cTime = in.readLong(); } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("lv=").append(layoutVersion).append(";cid=").append(clusterID) + .append(";nsid=").append(namespaceID).append(";c=").append(cTime); + return sb.toString(); + } } Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1099687&r1=1099686&r2=1099687&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu May 5 05:40:07 2011 @@ -36,14 +36,16 @@ import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.FSOutputSummer; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Daemon; @@ -87,14 +89,14 @@ class BlockReceiver implements Closeable private final boolean isDatanode; /** the block to receive */ - private final Block block; + private final ExtendedBlock block; /** the replica to write */ private final ReplicaInPipelineInterface replicaInfo; /** pipeline stage */ private final BlockConstructionStage stage; private final boolean isTransfer; - BlockReceiver(final Block block, final DataInputStream in, + BlockReceiver(final ExtendedBlock block, final DataInputStream in, final String inAddr, final String myAddr, final BlockConstructionStage stage, final long newGs, final long minBytesRcvd, final long maxBytesRcvd, @@ -145,14 +147,16 @@ class BlockReceiver implements Closeable case PIPELINE_SETUP_APPEND: replicaInfo = datanode.data.append(block, newGs, minBytesRcvd); if (datanode.blockScanner != null) { // remove from block scanner - datanode.blockScanner.deleteBlock(block); + datanode.blockScanner.deleteBlock(block.getBlockPoolId(), + block.getLocalBlock()); } block.setGenerationStamp(newGs); break; case PIPELINE_SETUP_APPEND_RECOVERY: replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd); if (datanode.blockScanner != null) { // remove from block scanner - datanode.blockScanner.deleteBlock(block); + datanode.blockScanner.deleteBlock(block.getBlockPoolId(), + block.getLocalBlock()); } block.setGenerationStamp(newGs); break; @@ -267,7 +271,8 @@ class BlockReceiver implements Closeable * affect this datanode unless it is caused by interruption. */ private void handleMirrorOutError(IOException ioe) throws IOException { - LOG.info(datanode.dnRegistration + ":Exception writing block " + + String bpid = block.getBlockPoolId(); + LOG.info(datanode.getDNRegistrationForBP(bpid) + ":Exception writing block " + block + " to mirror " + mirrorAddr + "\n" + StringUtils.stringifyException(ioe)); if (Thread.interrupted()) { // shut down if the thread is interrupted @@ -286,6 +291,7 @@ class BlockReceiver implements Closeable private void verifyChunks( byte[] dataBuf, int dataOff, int len, byte[] checksumBuf, int checksumOff ) throws IOException { + DatanodeProtocol nn = datanode.getBPNamenode(block.getBlockPoolId()); while (len > 0) { int chunkLen = Math.min(len, bytesPerChecksum); @@ -298,7 +304,7 @@ class BlockReceiver implements Closeable srcDataNode + " to namenode"); LocatedBlock lb = new LocatedBlock(block, new DatanodeInfo[] {srcDataNode}); - datanode.namenode.reportBadBlocks(new LocatedBlock[] {lb}); + nn.reportBadBlocks(new LocatedBlock[] {lb}); } catch (IOException e) { LOG.warn("Failed to report bad block " + block + " from datanode " + srcDataNode + " to namenode"); @@ -974,10 +980,12 @@ class BlockReceiver implements Closeable datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); if (ClientTraceLog.isInfoEnabled() && isClient) { long offset = 0; + DatanodeRegistration dnR = + datanode.getDNRegistrationForBP(block.getBlockPoolId()); ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr, myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset, - datanode.dnRegistration.getStorageID(), block, endTime-startTime)); + dnR.getStorageID(), block, endTime-startTime)); } else { LOG.info("Received block " + block + " of size " + block.getNumBytes() + " from " + inAddr); Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1099687&r1=1099686&r2=1099687&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu May 5 05:40:07 2011 @@ -31,7 +31,7 @@ import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.ChecksumException; -import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader; import org.apache.hadoop.hdfs.util.DataTransferThrottler; @@ -47,7 +47,7 @@ class BlockSender implements java.io.Clo public static final Log LOG = DataNode.LOG; static final Log ClientTraceLog = DataNode.ClientTraceLog; - private Block block; // the block to read from + private ExtendedBlock block; // the block to read from /** the replica to read from */ private final Replica replica; @@ -83,21 +83,22 @@ class BlockSender implements java.io.Clo private volatile ChunkChecksum lastChunkChecksum = null; - BlockSender(Block block, long startOffset, long length, + BlockSender(ExtendedBlock block, long startOffset, long length, boolean corruptChecksumOk, boolean chunkOffsetOK, boolean verifyChecksum, DataNode datanode) throws IOException { this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK, verifyChecksum, datanode, null); } - BlockSender(Block block, long startOffset, long length, + BlockSender(ExtendedBlock block, long startOffset, long length, boolean corruptChecksumOk, boolean chunkOffsetOK, boolean verifyChecksum, DataNode datanode, String clientTraceFmt) throws IOException { try { this.block = block; synchronized(datanode.data) { - this.replica = datanode.data.getReplica(block.getBlockId()); + this.replica = datanode.data.getReplica(block.getBlockPoolId(), + block.getBlockId()); if (replica == null) { throw new ReplicaNotFoundException(block); } @@ -153,9 +154,8 @@ class BlockSender implements java.io.Clo this.clientTraceFmt = clientTraceFmt; if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) { - checksumIn = new DataInputStream( - new BufferedInputStream(datanode.data.getMetaDataInputStream(block), - BUFFER_SIZE)); + checksumIn = new DataInputStream(new BufferedInputStream(datanode.data + .getMetaDataInputStream(block), BUFFER_SIZE)); // read and handle the common header here. For now just a version BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); @@ -201,7 +201,8 @@ class BlockSender implements java.io.Clo || (length + startOffset) > endOffset) { String msg = " Offset " + startOffset + " and length " + length + " don't match block " + block + " ( blockLen " + endOffset + " )"; - LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg); + LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) + + ":sendBlock() : " + msg); throw new IOException(msg); } Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java?rev=1099687&r1=1099686&r2=1099687&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java (original) +++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java Thu May 5 05:40:07 2011 @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; +import java.util.List; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; @@ -44,7 +46,7 @@ public interface BlockVolumeChoosingPoli * @return the chosen volume to store the block. * @throws IOException when disks are unavailable or are full. */ - public FSVolume chooseVolume(FSVolume[] volumes, long blockSize) + public FSVolume chooseVolume(List volumes, long blockSize) throws IOException; }