Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu May 5 05:40:07 2011
@@ -24,16 +24,14 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
-import java.util.EnumSet;
import java.util.Formatter;
import java.util.HashMap;
import java.util.HashSet;
@@ -46,45 +44,31 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -187,27 +171,19 @@ import org.apache.hadoop.util.ToolRunner
*/
@InterfaceAudience.Private
-public class Balancer implements Tool {
- private static final Log LOG =
- LogFactory.getLog(Balancer.class.getName());
+public class Balancer {
+ static final Log LOG = LogFactory.getLog(Balancer.class);
final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+ private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
/** The maximum number of concurrent blocks moves for
* balancing purpose at a datanode
*/
public static final int MAX_NUM_CONCURRENT_MOVES = 5;
- private Configuration conf;
-
- private double threshold = 10D;
- private NamenodeProtocol namenode;
- private ClientProtocol client;
- private FileSystem fs;
- private boolean isBlockTokenEnabled;
- private boolean shouldRun;
- private long keyUpdaterInterval;
- private BlockTokenSecretManager blockTokenSecretManager;
- private Daemon keyupdaterthread = null; // AccessKeyUpdater thread
+ private final NameNodeConnector nnc;
+ private final BalancingPolicy policy;
+ private final double threshold;
private final static Random rnd = new Random();
// all data node lists
@@ -233,8 +209,6 @@ public class Balancer implements Tool {
private NetworkTopology cluster = new NetworkTopology();
- private double avgUtilization = 0.0D;
-
final static private int MOVER_THREAD_POOL_SIZE = 1000;
final private ExecutorService moverExecutor =
Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
@@ -242,6 +216,7 @@ public class Balancer implements Tool {
final private ExecutorService dispatcherExecutor =
Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
+
/* This class keeps track of a scheduled block move */
private class PendingBlockMove {
private BalancerBlock block;
@@ -369,14 +344,9 @@ public class Balancer implements Tool {
/* Send a block replace request to the output stream*/
private void sendRequest(DataOutputStream out) throws IOException {
- Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
- if (isBlockTokenEnabled) {
- accessToken = blockTokenSecretManager.generateToken(null, block
- .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
- BlockTokenSecretManager.AccessMode.COPY));
- }
- DataTransferProtocol.Sender.opReplaceBlock(out,
- block.getBlock(), source.getStorageID(),
+ final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
+ final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+ DataTransferProtocol.Sender.opReplaceBlock(out, eb, source.getStorageID(),
proxySource.getDatanode(), accessToken);
}
@@ -487,30 +457,33 @@ public class Balancer implements Tool {
}
}
- /* Return the utilization of a datanode */
- static private double getUtilization(DatanodeInfo datanode) {
- return ((double)datanode.getDfsUsed())/datanode.getCapacity()*100;
- }
/* A class that keeps track of a datanode in Balancer */
private static class BalancerDatanode {
final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
- protected DatanodeInfo datanode;
- private double utilization;
- protected long maxSizeToMove;
+ final DatanodeInfo datanode;
+ final double utilization;
+ final long maxSize2Move;
protected long scheduledSize = 0L;
// blocks being moved but not confirmed yet
private List<PendingBlockMove> pendingBlocks =
new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES);
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "[" + getName()
+ + ", utilization=" + utilization + "]";
+ }
+
/* Constructor
* Depending on avgutil & threshold, calculate maximum bytes to move
*/
- private BalancerDatanode(
- DatanodeInfo node, double avgUtil, double threshold) {
+ private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) {
datanode = node;
- utilization = Balancer.getUtilization(node);
-
+ utilization = policy.getUtilization(node);
+ final double avgUtil = policy.getAvgUtilization();
+ long maxSizeToMove;
+
if (utilization >= avgUtil+threshold
|| utilization <= avgUtil-threshold) {
maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
@@ -521,7 +494,7 @@ public class Balancer implements Tool {
if (utilization < avgUtil ) {
maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
}
- maxSizeToMove = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+ this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
}
/** Get the datanode */
@@ -541,12 +514,12 @@ public class Balancer implements Tool {
/** Decide if still need to move more bytes */
protected boolean isMoveQuotaFull() {
- return scheduledSize<maxSizeToMove;
+ return scheduledSize<maxSize2Move;
}
/** Return the total number of bytes that need to be moved */
protected long availableSizeToMove() {
- return maxSizeToMove-scheduledSize;
+ return maxSize2Move-scheduledSize;
}
/* increment scheduled size */
@@ -604,8 +577,8 @@ public class Balancer implements Tool {
= new ArrayList<BalancerBlock>();
/* constructor */
- private Source(DatanodeInfo node, double avgUtil, double threshold) {
- super(node, avgUtil, threshold);
+ private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) {
+ super(node, policy, threshold);
}
/** Add a node task */
@@ -626,7 +599,7 @@ public class Balancer implements Tool {
* Return the total size of the received blocks in the number of bytes.
*/
private long getBlockList() throws IOException {
- BlockWithLocations[] newBlocks = namenode.getBlocks(datanode,
+ BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode,
Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
long bytesReceived = 0;
for (BlockWithLocations blk : newBlocks) {
@@ -780,160 +753,25 @@ public class Balancer implements Tool {
/* Check that this Balancer is compatible with the Block Placement Policy
* used by the Namenode.
*/
- private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException {
+ private static void checkReplicationPolicyCompatibility(Configuration conf
+ ) throws UnsupportedActionException {
if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() !=
BlockPlacementPolicyDefault.class) {
throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
}
}
-
- /** Default constructor */
- Balancer() throws UnsupportedActionException {
- }
-
- /** Construct a balancer from the given configuration */
- Balancer(Configuration conf) throws UnsupportedActionException {
- checkReplicationPolicyCompatibility(conf);
- setConf(conf);
- }
-
- /** Construct a balancer from the given configuration and threshold */
- Balancer(Configuration conf, double threshold) throws UnsupportedActionException {
- checkReplicationPolicyCompatibility(conf);
- setConf(conf);
- this.threshold = threshold;
- }
/**
- * Run a balancer
- * @param args
- */
- public static void main(String[] args) {
- try {
- System.exit( ToolRunner.run(null, new Balancer(), args) );
- } catch (Throwable e) {
- LOG.error(StringUtils.stringifyException(e));
- System.exit(-1);
- }
-
- }
-
- private static void printUsage() {
- System.out.println("Usage: java Balancer");
- System.out.println(" [-threshold <threshold>]\t"
- +"percentage of disk capacity");
- }
-
- /* parse argument to get the threshold */
- private double parseArgs(String[] args) {
- double threshold=0;
- int argsLen = (args == null) ? 0 : args.length;
- if (argsLen==0) {
- threshold = 10;
- } else {
- if (argsLen != 2 || !"-threshold".equalsIgnoreCase(args[0])) {
- printUsage();
- throw new IllegalArgumentException(Arrays.toString(args));
- } else {
- try {
- threshold = Double.parseDouble(args[1]);
- if (threshold < 0 || threshold >100) {
- throw new NumberFormatException();
- }
- LOG.info( "Using a threshold of " + threshold );
- } catch(NumberFormatException e) {
- System.err.println(
- "Expect a double parameter in the range of [0, 100]: "+ args[1]);
- printUsage();
- throw e;
- }
- }
- }
- return threshold;
- }
-
- /* Initialize balancer. It sets the value of the threshold, and
+ * Construct a balancer.
+ * Initialize balancer. It sets the value of the threshold, and
* builds the communication proxies to
* namenode as a client and a secondary namenode and retry proxies
* when connection fails.
*/
- private void init(double threshold) throws IOException {
- this.threshold = threshold;
- this.namenode = createNamenode(conf);
- this.client = DFSClient.createNamenode(conf);
- this.fs = FileSystem.get(conf);
- ExportedBlockKeys keys = namenode.getBlockKeys();
- this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
- if (isBlockTokenEnabled) {
- long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
- long blockTokenLifetime = keys.getTokenLifetime();
- LOG.info("Block token params received from NN: keyUpdateInterval="
- + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
- + blockTokenLifetime / (60 * 1000) + " min(s)");
- this.blockTokenSecretManager = new BlockTokenSecretManager(false,
- blockKeyUpdateInterval, blockTokenLifetime);
- this.blockTokenSecretManager.setKeys(keys);
- /*
- * Balancer should sync its block keys with NN more frequently than NN
- * updates its block keys
- */
- this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
- LOG.info("Balancer will update its block keys every "
- + keyUpdaterInterval / (60 * 1000) + " minute(s)");
- this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
- this.shouldRun = true;
- this.keyupdaterthread.start();
- }
- }
-
- /**
- * Periodically updates access keys.
- */
- class BlockKeyUpdater implements Runnable {
-
- public void run() {
- while (shouldRun) {
- try {
- blockTokenSecretManager.setKeys(namenode.getBlockKeys());
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- }
- try {
- Thread.sleep(keyUpdaterInterval);
- } catch (InterruptedException ie) {
- }
- }
- }
- }
-
- /* Build a NamenodeProtocol connection to the namenode and
- * set up the retry policy */
- private static NamenodeProtocol createNamenode(Configuration conf)
- throws IOException {
- InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true);
- RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
- 5, 200, TimeUnit.MILLISECONDS);
- Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
- new HashMap<Class<? extends Exception>, RetryPolicy>();
- RetryPolicy methodPolicy = RetryPolicies.retryByException(
- timeoutPolicy, exceptionToPolicyMap);
- Map<String,RetryPolicy> methodNameToPolicyMap =
- new HashMap<String, RetryPolicy>();
- methodNameToPolicyMap.put("getBlocks", methodPolicy);
- methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
-
- UserGroupInformation ugi;
- ugi = UserGroupInformation.getCurrentUser();
-
- return (NamenodeProtocol) RetryProxy.create(
- NamenodeProtocol.class,
- RPC.getProxy(NamenodeProtocol.class,
- NamenodeProtocol.versionID,
- nameNodeAddr,
- ugi,
- conf,
- NetUtils.getDefaultSocketFactory(conf)),
- methodNameToPolicyMap);
+ Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
+ this.threshold = p.threshold;
+ this.policy = p.policy;
+ this.nnc = theblockpool;
}
/* Shuffle datanode array */
@@ -946,13 +784,6 @@ public class Balancer implements Tool {
}
}
- /* get all live datanodes of a cluster and their disk usage
- * decide the number of bytes need to be moved
- */
- private long initNodes() throws IOException {
- return initNodes(client.getDatanodeReport(DatanodeReportType.LIVE));
- }
-
/* Given a data node set, build a network topology and decide
* over-utilized datanodes, above average utilized datanodes,
* below average utilized datanodes, and underutilized datanodes.
@@ -968,15 +799,13 @@ public class Balancer implements Tool {
*/
private long initNodes(DatanodeInfo[] datanodes) {
// compute average utilization
- long totalCapacity=0L, totalUsedSpace=0L;
for (DatanodeInfo datanode : datanodes) {
if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
continue; // ignore decommissioning or decommissioned nodes
}
- totalCapacity += datanode.getCapacity();
- totalUsedSpace += datanode.getDfsUsed();
+ policy.accumulateSpaces(datanode);
}
- this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
+ policy.initAvgUtilization();
/*create network topology and all data node lists:
* overloaded, above-average, below-average, and underloaded
@@ -991,19 +820,20 @@ public class Balancer implements Tool {
}
cluster.add(datanode);
BalancerDatanode datanodeS;
- if (getUtilization(datanode) > avgUtilization) {
- datanodeS = new Source(datanode, avgUtilization, threshold);
+ final double avg = policy.getAvgUtilization();
+ if (policy.getUtilization(datanode) > avg) {
+ datanodeS = new Source(datanode, policy, threshold);
if (isAboveAvgUtilized(datanodeS)) {
this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
} else {
assert(isOverUtilized(datanodeS)) :
datanodeS.getName()+ "is not an overUtilized node";
this.overUtilizedDatanodes.add((Source)datanodeS);
- overLoadedBytes += (long)((datanodeS.utilization-avgUtilization
+ overLoadedBytes += (long)((datanodeS.utilization-avg
-threshold)*datanodeS.datanode.getCapacity()/100.0);
}
} else {
- datanodeS = new BalancerDatanode(datanode, avgUtilization, threshold);
+ datanodeS = new BalancerDatanode(datanode, policy, threshold);
if ( isBelowOrEqualAvgUtilized(datanodeS)) {
this.belowAvgUtilizedDatanodes.add(datanodeS);
} else {
@@ -1011,7 +841,7 @@ public class Balancer implements Tool {
+ datanodeS.getName() + ")=" + isUnderUtilized(datanodeS)
+ ", utilization=" + datanodeS.utilization;
this.underUtilizedDatanodes.add(datanodeS);
- underLoadedBytes += (long)((avgUtilization-threshold-
+ underLoadedBytes += (long)((avg-threshold-
datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
}
}
@@ -1019,7 +849,7 @@ public class Balancer implements Tool {
}
//logging
- logImbalancedNodes();
+ logNodes();
assert (this.datanodes.size() ==
overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
@@ -1031,25 +861,20 @@ public class Balancer implements Tool {
}
/* log the over utilized & under utilized nodes */
- private void logImbalancedNodes() {
- StringBuilder msg = new StringBuilder();
- msg.append(overUtilizedDatanodes.size());
- msg.append(" over utilized nodes:");
- for (Source node : overUtilizedDatanodes) {
- msg.append( " " );
- msg.append( node.getName() );
- }
- LOG.info(msg);
- msg = new StringBuilder();
- msg.append(underUtilizedDatanodes.size());
- msg.append(" under utilized nodes: ");
- for (BalancerDatanode node : underUtilizedDatanodes) {
- msg.append( " " );
- msg.append( node.getName() );
- }
- LOG.info(msg);
+ private void logNodes() {
+ logNodes("over-utilized", overUtilizedDatanodes);
+ if (LOG.isTraceEnabled()) {
+ logNodes("above-average", aboveAvgUtilizedDatanodes);
+ logNodes("below-average", belowAvgUtilizedDatanodes);
+ }
+ logNodes("underutilized", underUtilizedDatanodes);
}
-
+
+ private static <T extends BalancerDatanode> void logNodes(
+ String name, Collection<T> nodes) {
+ LOG.info(nodes.size() + " " + name + ": " + nodes);
+ }
+
/* Decide all <source, target> pairs and
* the number of bytes to move from a source to a target
* Maximum bytes to be moved per node is
@@ -1313,7 +1138,6 @@ public class Balancer implements Tool {
*/
private static class MovedBlocks {
private long lastCleanupTime = System.currentTimeMillis();
- private static long winWidth = 5400*1000L; // 1.5 hour
final private static int CUR_WIN = 0;
final private static int OLD_WIN = 1;
final private static int NUM_WINS = 2;
@@ -1326,13 +1150,6 @@ public class Balancer implements Tool {
movedBlocks.add(new HashMap<Block,BalancerBlock>());
}
- /* set the win width */
- private void setWinWidth(Configuration conf) {
- winWidth = conf.getLong(
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
- }
-
/* add a block thus marking a block to be moved */
synchronized private void add(BalancerBlock block) {
movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
@@ -1353,7 +1170,7 @@ public class Balancer implements Tool {
synchronized private void cleanup() {
long curTime = System.currentTimeMillis();
// check if old win is older than winWidth
- if (lastCleanupTime + winWidth <= curTime) {
+ if (lastCleanupTime + WIN_WIDTH <= curTime) {
// purge the old window
movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
@@ -1419,7 +1236,7 @@ public class Balancer implements Tool {
this.datanodes.clear();
this.sources.clear();
this.targets.clear();
- this.avgUtilization = 0.0D;
+ this.policy.reset();
cleanGlobalBlockList();
this.movedBlocks.cleanup();
}
@@ -1439,182 +1256,172 @@ public class Balancer implements Tool {
/* Return true if the given datanode is overUtilized */
private boolean isOverUtilized(BalancerDatanode datanode) {
- return datanode.utilization > (avgUtilization+threshold);
+ return datanode.utilization > (policy.getAvgUtilization()+threshold);
}
/* Return true if the given datanode is above average utilized
* but not overUtilized */
private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
- return (datanode.utilization <= (avgUtilization+threshold))
- && (datanode.utilization > avgUtilization);
+ final double avg = policy.getAvgUtilization();
+ return (datanode.utilization <= (avg+threshold))
+ && (datanode.utilization > avg);
}
/* Return true if the given datanode is underUtilized */
private boolean isUnderUtilized(BalancerDatanode datanode) {
- return datanode.utilization < (avgUtilization-threshold);
+ return datanode.utilization < (policy.getAvgUtilization()-threshold);
}
/* Return true if the given datanode is below average utilized
* but not underUtilized */
private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) {
- return (datanode.utilization >= (avgUtilization-threshold))
- && (datanode.utilization <= avgUtilization);
+ final double avg = policy.getAvgUtilization();
+ return (datanode.utilization >= (avg-threshold))
+ && (datanode.utilization <= avg);
}
// Exit status
- final public static int SUCCESS = 1;
- final public static int ALREADY_RUNNING = -1;
- final public static int NO_MOVE_BLOCK = -2;
- final public static int NO_MOVE_PROGRESS = -3;
- final public static int IO_EXCEPTION = -4;
- final public static int ILLEGAL_ARGS = -5;
- /** main method of Balancer
- * @param args arguments to a Balancer
- * @throws Exception exception that occured during datanode balancing
- */
- public int run(String[] args) throws Exception {
- long startTime = Util.now();
- OutputStream out = null;
+ enum ReturnStatus {
+ SUCCESS(1),
+ IN_PROGRESS(0),
+ ALREADY_RUNNING(-1),
+ NO_MOVE_BLOCK(-2),
+ NO_MOVE_PROGRESS(-3),
+ IO_EXCEPTION(-4),
+ ILLEGAL_ARGS(-5),
+ INTERRUPTED(-6);
+
+ final int code;
+
+ ReturnStatus(int code) {
+ this.code = code;
+ }
+ }
+
+ /** Run an iteration for all datanodes. */
+ private ReturnStatus run(int iteration, Formatter formatter) {
try {
- // initialize a balancer
- init(parseArgs(args));
+ /* get all live datanodes of a cluster and their disk usage
+ * decide the number of bytes need to be moved
+ */
+ final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
+ if (bytesLeftToMove == 0) {
+ System.out.println("The cluster is balanced. Exiting...");
+ return ReturnStatus.SUCCESS;
+ } else {
+ LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
+ + " to make the cluster balanced." );
+ }
- /* Check if there is another balancer running.
- * Exit if there is another one running.
+ /* Decide all the nodes that will participate in the block move and
+ * the number of bytes that need to be moved from one node to another
+ * in this iteration. Maximum bytes to be moved per node is
+ * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
*/
- out = checkAndMarkRunningBalancer();
- if (out == null) {
- System.out.println("Another balancer is running. Exiting...");
- return ALREADY_RUNNING;
+ final long bytesToMove = chooseNodes();
+ if (bytesToMove == 0) {
+ System.out.println("No block can be moved. Exiting...");
+ return ReturnStatus.NO_MOVE_BLOCK;
+ } else {
+ LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
+ " in this iteration");
}
- Formatter formatter = new Formatter(System.out);
- System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
- int iterations = 0;
- while (true ) {
- /* get all live datanodes of a cluster and their disk usage
- * decide the number of bytes need to be moved
- */
- long bytesLeftToMove = initNodes();
- if (bytesLeftToMove == 0) {
- System.out.println("The cluster is balanced. Exiting...");
- return SUCCESS;
- } else {
- LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
- +" bytes to make the cluster balanced." );
- }
-
- /* Decide all the nodes that will participate in the block move and
- * the number of bytes that need to be moved from one node to another
- * in this iteration. Maximum bytes to be moved per node is
- * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
- */
- long bytesToMove = chooseNodes();
- if (bytesToMove == 0) {
- System.out.println("No block can be moved. Exiting...");
- return NO_MOVE_BLOCK;
- } else {
- LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
- "bytes in this iteration");
- }
-
- formatter.format("%-24s %10d %19s %18s %17s\n",
- DateFormat.getDateTimeInstance().format(new Date()),
- iterations,
- StringUtils.byteDesc(bytesMoved.get()),
- StringUtils.byteDesc(bytesLeftToMove),
- StringUtils.byteDesc(bytesToMove)
- );
-
- /* For each pair of <source, target>, start a thread that repeatedly
- * decide a block to be moved and its proxy source,
- * then initiates the move until all bytes are moved or no more block
- * available to move.
- * Exit no byte has been moved for 5 consecutive iterations.
- */
- if (dispatchBlockMoves() > 0) {
- notChangedIterations = 0;
- } else {
- notChangedIterations++;
- if (notChangedIterations >= 5) {
- System.out.println(
- "No block has been moved for 5 iterations. Exiting...");
- return NO_MOVE_PROGRESS;
- }
- }
-
- // clean all lists
- resetData();
-
- try {
- Thread.sleep(2000*conf.getLong(
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT));
- } catch (InterruptedException ignored) {
+ formatter.format("%-24s %10d %19s %18s %17s\n",
+ DateFormat.getDateTimeInstance().format(new Date()),
+ iteration,
+ StringUtils.byteDesc(bytesMoved.get()),
+ StringUtils.byteDesc(bytesLeftToMove),
+ StringUtils.byteDesc(bytesToMove)
+ );
+
+ /* For each pair of <source, target>, start a thread that repeatedly
+ * decide a block to be moved and its proxy source,
+ * then initiates the move until all bytes are moved or no more block
+ * available to move.
+ * Exit no byte has been moved for 5 consecutive iterations.
+ */
+ if (dispatchBlockMoves() > 0) {
+ notChangedIterations = 0;
+ } else {
+ notChangedIterations++;
+ if (notChangedIterations >= 5) {
+ System.out.println(
+ "No block has been moved for 5 iterations. Exiting...");
+ return ReturnStatus.NO_MOVE_PROGRESS;
}
-
- iterations++;
}
- } catch (IllegalArgumentException ae) {
- return ILLEGAL_ARGS;
+
+ // clean all lists
+ resetData();
+ return ReturnStatus.IN_PROGRESS;
+ } catch (IllegalArgumentException e) {
+ System.out.println(e + ". Exiting ...");
+ return ReturnStatus.ILLEGAL_ARGS;
} catch (IOException e) {
- System.out.println("Received an IO exception: " + e.getMessage() +
- " . Exiting...");
- return IO_EXCEPTION;
+ System.out.println(e + ". Exiting ...");
+ return ReturnStatus.IO_EXCEPTION;
+ } catch (InterruptedException e) {
+ System.out.println(e + ". Exiting ...");
+ return ReturnStatus.INTERRUPTED;
} finally {
// shutdown thread pools
dispatcherExecutor.shutdownNow();
moverExecutor.shutdownNow();
-
- shouldRun = false;
- try {
- if (keyupdaterthread != null) keyupdaterthread.interrupt();
- } catch (Exception e) {
- LOG.warn("Exception shutting down access key updater thread", e);
- }
- // close the output file
- IOUtils.closeStream(out);
- if (fs != null) {
- try {
- fs.delete(BALANCER_ID_PATH, true);
- } catch(IOException ignored) {
- }
- }
- System.out.println("Balancing took " +
- time2Str(Util.now()-startTime));
}
}
- private Path BALANCER_ID_PATH = new Path("/system/balancer.id");
- /* The idea for making sure that there is no more than one balancer
- * running in an HDFS is to create a file in the HDFS, writes the IP address
- * of the machine on which the balancer is running to the file, but did not
- * close the file until the balancer exits.
- * This prevents the second balancer from running because it can not
- * creates the file while the first one is running.
- *
- * This method checks if there is any running balancer and
- * if no, mark yes if no.
- * Note that this is an atomic operation.
- *
- * Return null if there is a running balancer; otherwise the output stream
- * to the newly created file.
+ /**
+ * Balance all namenodes.
+ * For each iteration,
+ * for each namenode,
+ * execute a {@link Balancer} to work through all datanodes once.
*/
- private OutputStream checkAndMarkRunningBalancer() throws IOException {
+ static int run(List<InetSocketAddress> namenodes, final Parameters p,
+ Configuration conf) throws IOException, InterruptedException {
+ final long sleeptime = 2000*conf.getLong(
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+ LOG.info("namenodes = " + namenodes);
+ LOG.info("p = " + p);
+
+ final Formatter formatter = new Formatter(System.out);
+ System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
+
+ final List<NameNodeConnector> connectors
+ = new ArrayList<NameNodeConnector>(namenodes.size());
try {
- DataOutputStream out = fs.create(BALANCER_ID_PATH);
- out. writeBytes(InetAddress.getLocalHost().getHostName());
- out.flush();
- return out;
- } catch(RemoteException e) {
- if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
- return null;
- } else {
- throw e;
+ for(InetSocketAddress isa : namenodes) {
+ connectors.add(new NameNodeConnector(isa, conf));
+ }
+
+ boolean done = false;
+ for(int iteration = 0; !done; iteration++) {
+ done = true;
+ Collections.shuffle(connectors);
+ for(NameNodeConnector nnc : connectors) {
+ final Balancer b = new Balancer(nnc, p, conf);
+ final ReturnStatus r = b.run(iteration, formatter);
+ if (r == ReturnStatus.IN_PROGRESS) {
+ done = false;
+ } else if (r != ReturnStatus.SUCCESS) {
+ //must be an error statue, return.
+ return r.code;
+ }
+ }
+
+ if (!done) {
+ Thread.sleep(sleeptime);
+ }
+ }
+ } finally {
+ for(NameNodeConnector nnc : connectors) {
+ nnc.close();
}
}
+ return ReturnStatus.SUCCESS.code;
}
-
+
/* Given elaspedTime in ms, return a printable string */
private static String time2Str(long elapsedTime) {
String unit;
@@ -1635,15 +1442,116 @@ public class Balancer implements Tool {
return time+" "+unit;
}
- /** return this balancer's configuration */
- public Configuration getConf() {
- return conf;
+ static class Parameters {
+ static final Parameters DEFALUT = new Parameters(
+ BalancingPolicy.Node.INSTANCE, 10.0);
+
+ final BalancingPolicy policy;
+ final double threshold;
+
+ Parameters(BalancingPolicy policy, double threshold) {
+ this.policy = policy;
+ this.threshold = threshold;
+ }
+
+ @Override
+ public String toString() {
+ return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
+ + "[" + policy + ", threshold=" + threshold + "]";
+ }
}
- /** set this balancer's configuration */
- public void setConf(Configuration conf) {
- this.conf = conf;
- movedBlocks.setWinWidth(conf);
+ static class Cli extends Configured implements Tool {
+ /** Parse arguments and then run Balancer */
+ @Override
+ public int run(String[] args) {
+ final long startTime = Util.now();
+ final Configuration conf = getConf();
+ WIN_WIDTH = conf.getLong(
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
+
+ try {
+ checkReplicationPolicyCompatibility(conf);
+
+ final List<InetSocketAddress> namenodes = DFSUtil.getNNServiceRpcAddresses(conf);
+ return Balancer.run(namenodes, parse(args), conf);
+ } catch (IOException e) {
+ System.out.println(e + ". Exiting ...");
+ return ReturnStatus.IO_EXCEPTION.code;
+ } catch (InterruptedException e) {
+ System.out.println(e + ". Exiting ...");
+ return ReturnStatus.INTERRUPTED.code;
+ } finally {
+ System.out.println("Balancing took " + time2Str(Util.now()-startTime));
+ }
+ }
+
+ /** parse command line arguments */
+ static Parameters parse(String[] args) {
+ BalancingPolicy policy = Parameters.DEFALUT.policy;
+ double threshold = Parameters.DEFALUT.threshold;
+
+ if (args != null) {
+ try {
+ for(int i = 0; i < args.length; i++) {
+ if ("-threshold".equalsIgnoreCase(args[i])) {
+ i++;
+ try {
+ threshold = Double.parseDouble(args[i]);
+ if (threshold < 0 || threshold > 100) {
+ throw new NumberFormatException(
+ "Number out of range: threshold = " + threshold);
+ }
+ LOG.info( "Using a threshold of " + threshold );
+ } catch(NumberFormatException e) {
+ System.err.println(
+ "Expecting a number in the range of [0.0, 100.0]: "
+ + args[i]);
+ throw e;
+ }
+ } else if ("-policy".equalsIgnoreCase(args[i])) {
+ i++;
+ try {
+ policy = BalancingPolicy.parse(args[i]);
+ } catch(IllegalArgumentException e) {
+ System.err.println("Illegal policy name: " + args[i]);
+ throw e;
+ }
+ } else {
+ throw new IllegalArgumentException("args = "
+ + Arrays.toString(args));
+ }
+ }
+ } catch(RuntimeException e) {
+ printUsage();
+ throw e;
+ }
+ }
+
+ return new Parameters(policy, threshold);
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: java " + Balancer.class.getSimpleName());
+ System.out.println(" [-policy <policy>]\tthe balancing policy: "
+ + BalancingPolicy.Node.INSTANCE.getName() + " or "
+ + BalancingPolicy.Pool.INSTANCE.getName());
+ System.out.println(
+ " [-threshold <threshold>]\tPercentage of disk capacity");
+ }
}
+ /**
+ * Run a balancer
+ * @param args Command line arguments
+ */
+ public static void main(String[] args) {
+ try {
+ System.exit(ToolRunner.run(null, new Cli(), args));
+ } catch (Throwable e) {
+ LOG.error(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Thu May 5 05:40:07 2011
@@ -41,6 +41,8 @@ public interface HdfsConstants {
/** Startup options */
static public enum StartupOption{
FORMAT ("-format"),
+ CLUSTERID ("-clusterid"),
+ GENCLUSTERID ("-genclusterid"),
REGULAR ("-regular"),
BACKUP ("-backup"),
CHECKPOINT("-checkpoint"),
@@ -50,6 +52,10 @@ public interface HdfsConstants {
IMPORT ("-importCheckpoint");
private String name = null;
+
+ // Used only with format and upgrade options
+ private String clusterId = null;
+
private StartupOption(String arg) {this.name = arg;}
public String getName() {return name;}
public NamenodeRole toNodeRole() {
@@ -62,7 +68,14 @@ public interface HdfsConstants {
return NamenodeRole.ACTIVE;
}
}
-
+
+ public void setClusterId(String cid) {
+ clusterId = cid;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
}
// Timeouts for communicating with DataNode for streaming writes/reads
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu May 5 05:40:07 2011
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.Random;
import java.util.TreeSet;
+import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.jsp.JspWriter;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.BlockReade
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -66,6 +68,7 @@ public class JspHelper {
public static final String CURRENT_CONF = "current.conf";
final static public String WEB_UGI_PROPERTY_NAME = "dfs.web.ugi";
public static final String DELEGATION_PARAMETER_NAME = "delegation";
+ public static final String NAMENODE_ADDRESS = "nnaddr";
static final String SET_DELEGATION = "&" + DELEGATION_PARAMETER_NAME +
"=";
private static final Log LOG = LogFactory.getLog(JspHelper.class);
@@ -181,7 +184,7 @@ public class JspHelper {
return chosenNode;
}
- public static void streamBlockInAscii(InetSocketAddress addr,
+ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
long blockSize, long offsetIntoBlock, long chunkSizeToView,
JspWriter out, Configuration conf) throws IOException {
@@ -193,9 +196,9 @@ public class JspHelper {
long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
// Use the block name for file name.
- String file = BlockReader.getFileName(addr, blockId);
+ String file = BlockReader.getFileName(addr, poolId, blockId);
BlockReader blockReader = BlockReader.newBlockReader(s, file,
- new Block(blockId, 0, genStamp), blockToken,
+ new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
byte[] buf = new byte[(int)amtToRead];
@@ -359,14 +362,16 @@ public class JspHelper {
public static void printPathWithLinks(String dir, JspWriter out,
int namenodeInfoPort,
- String tokenString
+ String tokenString,
+ String nnAddress
) throws IOException {
try {
String[] parts = dir.split(Path.SEPARATOR);
StringBuilder tempPath = new StringBuilder(dir.length());
out.print("<a href=\"browseDirectory.jsp" + "?dir="+ Path.SEPARATOR
+ "&namenodeInfoPort=" + namenodeInfoPort
- + getDelegationTokenUrlParam(tokenString) + "\">" + Path.SEPARATOR
+ + getDelegationTokenUrlParam(tokenString)
+ + getUrlParam(NAMENODE_ADDRESS, nnAddress) + "\">" + Path.SEPARATOR
+ "</a>");
tempPath.append(Path.SEPARATOR);
for (int i = 0; i < parts.length-1; i++) {
@@ -374,7 +379,8 @@ public class JspHelper {
tempPath.append(parts[i]);
out.print("<a href=\"browseDirectory.jsp" + "?dir="
+ tempPath.toString() + "&namenodeInfoPort=" + namenodeInfoPort
- + getDelegationTokenUrlParam(tokenString));
+ + getDelegationTokenUrlParam(tokenString)
+ + getUrlParam(NAMENODE_ADDRESS, nnAddress));
out.print("\">" + parts[i] + "</a>" + Path.SEPARATOR);
tempPath.append(Path.SEPARATOR);
}
@@ -391,7 +397,8 @@ public class JspHelper {
public static void printGotoForm(JspWriter out,
int namenodeInfoPort,
String tokenString,
- String file) throws IOException {
+ String file,
+ String nnAddress) throws IOException {
out.print("<form action=\"browseDirectory.jsp\" method=\"get\" name=\"goto\">");
out.print("Goto : ");
out.print("<input name=\"dir\" type=\"text\" width=\"50\" id\"dir\" value=\""+ file+"\">");
@@ -402,6 +409,8 @@ public class JspHelper {
out.print("<input name=\"" + DELEGATION_PARAMETER_NAME
+ "\" type=\"hidden\" value=\"" + tokenString + "\">");
}
+ out.print("<input name=\""+ NAMENODE_ADDRESS +"\" type=\"hidden\" "
+ + "value=\"" + nnAddress + "\">");
out.print("</form>");
}
@@ -475,16 +484,43 @@ public class JspHelper {
return UserGroupInformation.createRemoteUser(strings[0]);
}
+ private static String getNNServiceAddress(ServletContext context,
+ HttpServletRequest request) {
+ String namenodeAddressInUrl = request.getParameter(NAMENODE_ADDRESS);
+ InetSocketAddress namenodeAddress = null;
+ if (namenodeAddressInUrl != null) {
+ namenodeAddress = DFSUtil.getSocketAddress(namenodeAddressInUrl);
+ } else if (context != null) {
+ namenodeAddress = (InetSocketAddress) context
+ .getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
+ }
+ if (namenodeAddress != null) {
+ return (namenodeAddress.getAddress().getHostAddress() + ":"
+ + namenodeAddress.getPort());
+ }
+ return null;
+ }
+
+ /**
+ * See
+ * {@link JspHelper#getUGI(ServletContext, HttpServletRequest, Configuration)}
+ * , ServletContext is passed as null.
+ */
+ public static UserGroupInformation getUGI(HttpServletRequest request,
+ Configuration conf) throws IOException {
+ return getUGI(null, request, conf);
+ }
+
/**
* Get {@link UserGroupInformation} and possibly the delegation token out of
* the request.
+ * @param context the ServletContext that is serving this request.
* @param request the http request
* @return a new user from the request
* @throws AccessControlException if the request has no token
*/
- public static UserGroupInformation getUGI(HttpServletRequest request,
- Configuration conf
- ) throws IOException {
+ public static UserGroupInformation getUGI(ServletContext context,
+ HttpServletRequest request, Configuration conf) throws IOException {
UserGroupInformation ugi = null;
if(UserGroupInformation.isSecurityEnabled()) {
String user = request.getRemoteUser();
@@ -493,12 +529,12 @@ public class JspHelper {
Token<DelegationTokenIdentifier> token =
new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(tokenString);
- InetSocketAddress serviceAddr = NameNode.getAddress(conf);
- LOG.info("Setting service in token: "
- + new Text(serviceAddr.getAddress().getHostAddress() + ":"
- + serviceAddr.getPort()));
- token.setService(new Text(serviceAddr.getAddress().getHostAddress()
- + ":" + serviceAddr.getPort()));
+ String serviceAddress = getNNServiceAddress(context, request);
+ if (serviceAddress != null) {
+ LOG.info("Setting service in token: "
+ + new Text(serviceAddress));
+ token.setService(new Text(serviceAddress));
+ }
ByteArrayInputStream buf = new ByteArrayInputStream(token
.getIdentifier());
DataInputStream in = new DataInputStream(buf);
@@ -549,5 +585,13 @@ public class JspHelper {
}
}
-
+ /**
+ * Returns the url parameter for the given bpid string.
+ * @param name parameter name
+ * @param val parameter value
+ * @return url parameter
+ */
+ public static String getUrlParam(String name, String val) {
+ return val == null ? "" : "&" + name + "=" + val;
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Thu May 5 05:40:07 2011
@@ -78,18 +78,21 @@ public abstract class Storage extends St
// last layout version that did not support persistent rbw replicas
public static final int PRE_RBW_LAYOUT_VERSION = -19;
+ // last layout version that is before federation
+ public static final int LAST_PRE_FEDERATION_LAYOUT_VERSION = -30;
+
/** Layout versions of 0.20.203 release */
public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
private static final String STORAGE_FILE_LOCK = "in_use.lock";
protected static final String STORAGE_FILE_VERSION = "VERSION";
public static final String STORAGE_DIR_CURRENT = "current";
- private static final String STORAGE_DIR_PREVIOUS = "previous";
- private static final String STORAGE_TMP_REMOVED = "removed.tmp";
- private static final String STORAGE_TMP_PREVIOUS = "previous.tmp";
- private static final String STORAGE_TMP_FINALIZED = "finalized.tmp";
- private static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
- private static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
+ public static final String STORAGE_DIR_PREVIOUS = "previous";
+ public static final String STORAGE_TMP_REMOVED = "removed.tmp";
+ public static final String STORAGE_TMP_PREVIOUS = "previous.tmp";
+ public static final String STORAGE_TMP_FINALIZED = "finalized.tmp";
+ public static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
+ public static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
public enum StorageState {
NON_EXISTENT,
@@ -115,7 +118,7 @@ public abstract class Storage extends St
public boolean isOfType(StorageDirType type);
}
- private NodeType storageType; // Type of the node using this storage
+ protected NodeType storageType; // Type of the node using this storage
protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
private class DirIterator implements Iterator<StorageDirectory> {
@@ -208,19 +211,32 @@ public abstract class Storage extends St
*/
@InterfaceAudience.Private
public class StorageDirectory {
- File root; // root directory
- FileLock lock; // storage lock
- StorageDirType dirType; // storage dir type
+ final File root; // root directory
+ final boolean useLock; // flag to enable storage lock
+ final StorageDirType dirType; // storage dir type
+ FileLock lock; // storage lock
public StorageDirectory(File dir) {
// default dirType is null
- this(dir, null);
+ this(dir, null, true);
}
public StorageDirectory(File dir, StorageDirType dirType) {
+ this(dir, dirType, true);
+ }
+
+ /**
+ * Constructor
+ * @param dir directory corresponding to the storage
+ * @param dirType storage directory type
+ * @param useLock true - enables locking on the storage directory and false
+ * disables locking
+ */
+ public StorageDirectory(File dir, StorageDirType dirType, boolean useLock) {
this.root = dir;
this.lock = null;
this.dirType = dirType;
+ this.useLock = useLock;
}
/**
@@ -245,22 +261,26 @@ public abstract class Storage extends St
public void read() throws IOException {
read(getVersionFile());
}
-
public void read(File from) throws IOException {
+ Properties props = readFrom(from);
+ getFields(props, this);
+ }
+
+ public Properties readFrom(File from) throws IOException {
RandomAccessFile file = new RandomAccessFile(from, "rws");
FileInputStream in = null;
+ Properties props = new Properties();
try {
in = new FileInputStream(file.getFD());
file.seek(0);
- Properties props = new Properties();
props.load(in);
- getFields(props, this);
} finally {
if (in != null) {
in.close();
}
file.close();
}
+ return props;
}
/**
@@ -620,6 +640,10 @@ public abstract class Storage extends St
* @throws IOException if locking fails
*/
public void lock() throws IOException {
+ if (!useLock) {
+ LOG.info("Locking is disabled");
+ return;
+ }
this.lock = tryLock();
if (lock == null) {
String msg = "Cannot lock storage " + this.root
@@ -676,11 +700,6 @@ public abstract class Storage extends St
this.storageType = type;
}
- protected Storage(NodeType type, int nsID, long cT) {
- super(FSConstants.LAYOUT_VERSION, nsID, cT);
- this.storageType = type;
- }
-
protected Storage(NodeType type, StorageInfo storageInfo) {
super(storageInfo);
this.storageType = type;
@@ -726,8 +745,9 @@ public abstract class Storage extends St
public static void checkVersionUpgradable(int oldVersion)
throws IOException {
if (oldVersion > LAST_UPGRADABLE_LAYOUT_VERSION) {
- String msg = "*********** Upgrade is not supported from this older" +
- " version of storage to the current version." +
+ String msg = "*********** Upgrade is not supported from this " +
+ " older version " + oldVersion +
+ " of storage to the current version." +
" Please upgrade to " + LAST_UPGRADABLE_HADOOP_VERSION +
" or a later version and then upgrade to current" +
" version. Old layout version is " +
@@ -751,29 +771,11 @@ public abstract class Storage extends St
protected void getFields(Properties props,
StorageDirectory sd
) throws IOException {
- String sv, st, sid, sct;
- sv = props.getProperty("layoutVersion");
- st = props.getProperty("storageType");
- sid = props.getProperty("namespaceID");
- sct = props.getProperty("cTime");
- if (sv == null || st == null || sid == null || sct == null)
- throw new InconsistentFSStateException(sd.root,
- "file " + STORAGE_FILE_VERSION + " is invalid.");
- int rv = Integer.parseInt(sv);
- NodeType rt = NodeType.valueOf(st);
- int rid = Integer.parseInt(sid);
- long rct = Long.parseLong(sct);
- if (!storageType.equals(rt) ||
- !((namespaceID == 0) || (rid == 0) || namespaceID == rid))
- throw new InconsistentFSStateException(sd.root,
- "is incompatible with others.");
- if (rv < FSConstants.LAYOUT_VERSION) // future version
- throw new IncorrectVersionException(rv, "storage directory "
- + sd.root.getCanonicalPath());
- layoutVersion = rv;
- storageType = rt;
- namespaceID = rid;
- cTime = rct;
+ setLayoutVersion(props, sd);
+ setNamespaceID(props, sd);
+ setStorageType(props, sd);
+ setcTime(props, sd);
+ setClusterId(props, layoutVersion, sd);
}
/**
@@ -789,6 +791,10 @@ public abstract class Storage extends St
props.setProperty("layoutVersion", String.valueOf(layoutVersion));
props.setProperty("storageType", storageType.toString());
props.setProperty("namespaceID", String.valueOf(namespaceID));
+ // Set clusterID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
+ if (layoutVersion < LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+ props.setProperty("clusterID", clusterID);
+ }
props.setProperty("cTime", String.valueOf(cTime));
}
@@ -871,10 +877,74 @@ public abstract class Storage extends St
public static String getRegistrationID(StorageInfo storage) {
return "NS-" + Integer.toString(storage.getNamespaceID())
+ + "-" + storage.getClusterID()
+ "-" + Integer.toString(storage.getLayoutVersion())
+ "-" + Long.toString(storage.getCTime());
}
+ String getProperty(Properties props, StorageDirectory sd,
+ String name) throws InconsistentFSStateException {
+ String property = props.getProperty(name);
+ if (property == null) {
+ throw new InconsistentFSStateException(sd.root, "file "
+ + STORAGE_FILE_VERSION + " has " + name + " mising.");
+ }
+ return property;
+ }
+
+ /** Validate and set storage type from {@link Properties}*/
+ protected void setStorageType(Properties props, StorageDirectory sd)
+ throws InconsistentFSStateException {
+ NodeType type = NodeType.valueOf(getProperty(props, sd, "storageType"));
+ if (!storageType.equals(type)) {
+ throw new InconsistentFSStateException(sd.root,
+ "node type is incompatible with others.");
+ }
+ storageType = type;
+ }
+
+ /** Validate and set ctime from {@link Properties}*/
+ protected void setcTime(Properties props, StorageDirectory sd)
+ throws InconsistentFSStateException {
+ cTime = Long.parseLong(getProperty(props, sd, "cTime"));
+ }
+
+ /** Validate and set clusterId from {@link Properties}*/
+ protected void setClusterId(Properties props, int layoutVersion,
+ StorageDirectory sd) throws InconsistentFSStateException {
+ // No Cluster ID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
+ if (layoutVersion < Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+ String cid = getProperty(props, sd, "clusterID");
+ if (!(clusterID.equals("") || cid.equals("") || clusterID.equals(cid))) {
+ throw new InconsistentFSStateException(sd.getRoot(),
+ "cluster Id is incompatible with others.");
+ }
+ clusterID = cid;
+ }
+ }
+
+ /** Validate and set layout version from {@link Properties}*/
+ protected void setLayoutVersion(Properties props, StorageDirectory sd)
+ throws IncorrectVersionException, InconsistentFSStateException {
+ int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
+ if (lv < FSConstants.LAYOUT_VERSION) { // future version
+ throw new IncorrectVersionException(lv, "storage directory "
+ + sd.root.getAbsolutePath());
+ }
+ layoutVersion = lv;
+ }
+
+ /** Validate and set namespaceID version from {@link Properties}*/
+ protected void setNamespaceID(Properties props, StorageDirectory sd)
+ throws InconsistentFSStateException {
+ int nsId = Integer.parseInt(getProperty(props, sd, "namespaceID"));
+ if (namespaceID != 0 && nsId != 0 && namespaceID != nsId) {
+ throw new InconsistentFSStateException(sd.root,
+ "namespaceID is incompatible with others.");
+ }
+ namespaceID = nsId;
+ }
+
public static boolean is203LayoutVersion(int layoutVersion) {
for (int lv203 : LAYOUT_VERSIONS_203) {
if (lv203 == layoutVersion) {
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java Thu May 5 05:40:07 2011
@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.io.WritableUtils;
/**
* Common class for storage information.
@@ -34,14 +34,16 @@ import org.apache.hadoop.io.Writable;
public class StorageInfo implements Writable {
public int layoutVersion; // layout version of the storage data
public int namespaceID; // id of the file system
+ public String clusterID; // id of the cluster
public long cTime; // creation time of the file system state
public StorageInfo () {
- this(0, 0, 0L);
+ this(0, 0, "", 0L);
}
- public StorageInfo(int layoutV, int nsID, long cT) {
+ public StorageInfo(int layoutV, int nsID, String cid, long cT) {
layoutVersion = layoutV;
+ clusterID = cid;
namespaceID = nsID;
cTime = cT;
}
@@ -63,13 +65,19 @@ public class StorageInfo implements Writ
public int getNamespaceID() { return namespaceID; }
/**
+ * cluster id of the file system.<p>
+ */
+ public String getClusterID() { return clusterID; }
+
+ /**
* Creation time of the file system state.<p>
* Modified during upgrades.
*/
public long getCTime() { return cTime; }
-
+
public void setStorageInfo(StorageInfo from) {
layoutVersion = from.layoutVersion;
+ clusterID = from.clusterID;
namespaceID = from.namespaceID;
cTime = from.cTime;
}
@@ -80,12 +88,21 @@ public class StorageInfo implements Writ
public void write(DataOutput out) throws IOException {
out.writeInt(getLayoutVersion());
out.writeInt(getNamespaceID());
+ WritableUtils.writeString(out, clusterID);
out.writeLong(getCTime());
}
public void readFields(DataInput in) throws IOException {
layoutVersion = in.readInt();
namespaceID = in.readInt();
+ clusterID = WritableUtils.readString(in);
cTime = in.readLong();
}
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("lv=").append(layoutVersion).append(";cid=").append(clusterID)
+ .append(";nsid=").append(namespaceID).append(";c=").append(cTime);
+ return sb.toString();
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu May 5 05:40:07 2011
@@ -36,14 +36,16 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.FSOutputSummer;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
@@ -87,14 +89,14 @@ class BlockReceiver implements Closeable
private final boolean isDatanode;
/** the block to receive */
- private final Block block;
+ private final ExtendedBlock block;
/** the replica to write */
private final ReplicaInPipelineInterface replicaInfo;
/** pipeline stage */
private final BlockConstructionStage stage;
private final boolean isTransfer;
- BlockReceiver(final Block block, final DataInputStream in,
+ BlockReceiver(final ExtendedBlock block, final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
@@ -145,14 +147,16 @@ class BlockReceiver implements Closeable
case PIPELINE_SETUP_APPEND:
replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
- datanode.blockScanner.deleteBlock(block);
+ datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
+ block.getLocalBlock());
}
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
- datanode.blockScanner.deleteBlock(block);
+ datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
+ block.getLocalBlock());
}
block.setGenerationStamp(newGs);
break;
@@ -267,7 +271,8 @@ class BlockReceiver implements Closeable
* affect this datanode unless it is caused by interruption.
*/
private void handleMirrorOutError(IOException ioe) throws IOException {
- LOG.info(datanode.dnRegistration + ":Exception writing block " +
+ String bpid = block.getBlockPoolId();
+ LOG.info(datanode.getDNRegistrationForBP(bpid) + ":Exception writing block " +
block + " to mirror " + mirrorAddr + "\n" +
StringUtils.stringifyException(ioe));
if (Thread.interrupted()) { // shut down if the thread is interrupted
@@ -286,6 +291,7 @@ class BlockReceiver implements Closeable
private void verifyChunks( byte[] dataBuf, int dataOff, int len,
byte[] checksumBuf, int checksumOff )
throws IOException {
+ DatanodeProtocol nn = datanode.getBPNamenode(block.getBlockPoolId());
while (len > 0) {
int chunkLen = Math.min(len, bytesPerChecksum);
@@ -298,7 +304,7 @@ class BlockReceiver implements Closeable
srcDataNode + " to namenode");
LocatedBlock lb = new LocatedBlock(block,
new DatanodeInfo[] {srcDataNode});
- datanode.namenode.reportBadBlocks(new LocatedBlock[] {lb});
+ nn.reportBadBlocks(new LocatedBlock[] {lb});
} catch (IOException e) {
LOG.warn("Failed to report bad block " + block +
" from datanode " + srcDataNode + " to namenode");
@@ -974,10 +980,12 @@ class BlockReceiver implements Closeable
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() && isClient) {
long offset = 0;
+ DatanodeRegistration dnR =
+ datanode.getDNRegistrationForBP(block.getBlockPoolId());
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
inAddr, myAddr, block.getNumBytes(),
"HDFS_WRITE", clientname, offset,
- datanode.dnRegistration.getStorageID(), block, endTime-startTime));
+ dnR.getStorageID(), block, endTime-startTime));
} else {
LOG.info("Received block " + block + " of size "
+ block.getNumBytes() + " from " + inAddr);
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu May 5 05:40:07 2011
@@ -31,7 +31,7 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -47,7 +47,7 @@ class BlockSender implements java.io.Clo
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
- private Block block; // the block to read from
+ private ExtendedBlock block; // the block to read from
/** the replica to read from */
private final Replica replica;
@@ -83,21 +83,22 @@ class BlockSender implements java.io.Clo
private volatile ChunkChecksum lastChunkChecksum = null;
- BlockSender(Block block, long startOffset, long length,
+ BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK,
boolean verifyChecksum, DataNode datanode) throws IOException {
this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
verifyChecksum, datanode, null);
}
- BlockSender(Block block, long startOffset, long length,
+ BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK,
boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
throws IOException {
try {
this.block = block;
synchronized(datanode.data) {
- this.replica = datanode.data.getReplica(block.getBlockId());
+ this.replica = datanode.data.getReplica(block.getBlockPoolId(),
+ block.getBlockId());
if (replica == null) {
throw new ReplicaNotFoundException(block);
}
@@ -153,9 +154,8 @@ class BlockSender implements java.io.Clo
this.clientTraceFmt = clientTraceFmt;
if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
- checksumIn = new DataInputStream(
- new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
- BUFFER_SIZE));
+ checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
+ .getMetaDataInputStream(block), BUFFER_SIZE));
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
@@ -201,7 +201,8 @@ class BlockSender implements java.io.Clo
|| (length + startOffset) > endOffset) {
String msg = " Offset " + startOffset + " and length " + length
+ " don't match block " + block + " ( blockLen " + endOffset + " )";
- LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg);
+ LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) +
+ ":sendBlock() : " + msg);
throw new IOException(msg);
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java Thu May 5 05:40:07 2011
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
@@ -44,7 +46,7 @@ public interface BlockVolumeChoosingPoli
* @return the chosen volume to store the block.
* @throws IOException when disks are unavailable or are full.
*/
- public FSVolume chooseVolume(FSVolume[] volumes, long blockSize)
+ public FSVolume chooseVolume(List<FSVolume> volumes, long blockSize)
throws IOException;
}
|