hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1099687 [3/15] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/pro...
Date Thu, 05 May 2011 05:40:13 GMT
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu May  5 05:40:07 2011
@@ -24,16 +24,14 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.text.DateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
-import java.util.EnumSet;
 import java.util.Formatter;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -46,45 +44,31 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -187,27 +171,19 @@ import org.apache.hadoop.util.ToolRunner
  */
 
 @InterfaceAudience.Private
-public class Balancer implements Tool {
-  private static final Log LOG = 
-    LogFactory.getLog(Balancer.class.getName());
+public class Balancer {
+  static final Log LOG = LogFactory.getLog(Balancer.class);
   final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+  private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
 
   /** The maximum number of concurrent blocks moves for 
    * balancing purpose at a datanode
    */
   public static final int MAX_NUM_CONCURRENT_MOVES = 5;
   
-  private Configuration conf;
-
-  private double threshold = 10D;
-  private NamenodeProtocol namenode;
-  private ClientProtocol client;
-  private FileSystem fs;
-  private boolean isBlockTokenEnabled;
-  private boolean shouldRun;
-  private long keyUpdaterInterval;
-  private BlockTokenSecretManager blockTokenSecretManager;
-  private Daemon keyupdaterthread = null; // AccessKeyUpdater thread
+  private final NameNodeConnector nnc;
+  private final BalancingPolicy policy;
+  private final double threshold;
   private final static Random rnd = new Random();
   
   // all data node lists
@@ -233,8 +209,6 @@ public class Balancer implements Tool {
   
   private NetworkTopology cluster = new NetworkTopology();
   
-  private double avgUtilization = 0.0D;
-  
   final static private int MOVER_THREAD_POOL_SIZE = 1000;
   final private ExecutorService moverExecutor = 
     Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
@@ -242,6 +216,7 @@ public class Balancer implements Tool {
   final private ExecutorService dispatcherExecutor =
     Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
   
+
   /* This class keeps track of a scheduled block move */
   private class PendingBlockMove {
     private BalancerBlock block;
@@ -369,14 +344,9 @@ public class Balancer implements Tool {
     
     /* Send a block replace request to the output stream*/
     private void sendRequest(DataOutputStream out) throws IOException {
-      Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
-      if (isBlockTokenEnabled) {
-        accessToken = blockTokenSecretManager.generateToken(null, block
-            .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
-            BlockTokenSecretManager.AccessMode.COPY));
-      }
-      DataTransferProtocol.Sender.opReplaceBlock(out,
-          block.getBlock(), source.getStorageID(), 
+      final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
+      final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+      DataTransferProtocol.Sender.opReplaceBlock(out, eb, source.getStorageID(), 
           proxySource.getDatanode(), accessToken);
     }
     
@@ -487,30 +457,33 @@ public class Balancer implements Tool {
     }
   }
   
-  /* Return the utilization of a datanode */
-  static private double getUtilization(DatanodeInfo datanode) {
-    return ((double)datanode.getDfsUsed())/datanode.getCapacity()*100;
-  }
   
   /* A class that keeps track of a datanode in Balancer */
   private static class BalancerDatanode {
     final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
-    protected DatanodeInfo datanode;
-    private double utilization;
-    protected long maxSizeToMove;
+    final DatanodeInfo datanode;
+    final double utilization;
+    final long maxSize2Move;
     protected long scheduledSize = 0L;
     //  blocks being moved but not confirmed yet
     private List<PendingBlockMove> pendingBlocks = 
       new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); 
     
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "[" + getName()
+          + ", utilization=" + utilization + "]";
+    }
+
     /* Constructor 
      * Depending on avgutil & threshold, calculate maximum bytes to move 
      */
-    private BalancerDatanode(
-        DatanodeInfo node, double avgUtil, double threshold) {
+    private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) {
       datanode = node;
-      utilization = Balancer.getUtilization(node);
-        
+      utilization = policy.getUtilization(node);
+      final double avgUtil = policy.getAvgUtilization();
+      long maxSizeToMove;
+
       if (utilization >= avgUtil+threshold
           || utilization <= avgUtil-threshold) { 
         maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
@@ -521,7 +494,7 @@ public class Balancer implements Tool {
       if (utilization < avgUtil ) {
         maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
       }
-      maxSizeToMove = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+      this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
     }
     
     /** Get the datanode */
@@ -541,12 +514,12 @@ public class Balancer implements Tool {
     
     /** Decide if still need to move more bytes */
     protected boolean isMoveQuotaFull() {
-      return scheduledSize<maxSizeToMove;
+      return scheduledSize<maxSize2Move;
     }
 
     /** Return the total number of bytes that need to be moved */
     protected long availableSizeToMove() {
-      return maxSizeToMove-scheduledSize;
+      return maxSize2Move-scheduledSize;
     }
     
     /* increment scheduled size */
@@ -604,8 +577,8 @@ public class Balancer implements Tool {
             = new ArrayList<BalancerBlock>();
     
     /* constructor */
-    private Source(DatanodeInfo node, double avgUtil, double threshold) {
-      super(node, avgUtil, threshold);
+    private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) {
+      super(node, policy, threshold);
     }
     
     /** Add a node task */
@@ -626,7 +599,7 @@ public class Balancer implements Tool {
      * Return the total size of the received blocks in the number of bytes.
      */
     private long getBlockList() throws IOException {
-      BlockWithLocations[] newBlocks = namenode.getBlocks(datanode, 
+      BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode, 
         Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
       long bytesReceived = 0;
       for (BlockWithLocations blk : newBlocks) {
@@ -780,160 +753,25 @@ public class Balancer implements Tool {
   /* Check that this Balancer is compatible with the Block Placement Policy
    * used by the Namenode.
    */
-  private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException {
+  private static void checkReplicationPolicyCompatibility(Configuration conf
+      ) throws UnsupportedActionException {
     if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != 
         BlockPlacementPolicyDefault.class) {
       throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
     }
   }
-  
-  /** Default constructor */
-  Balancer() throws UnsupportedActionException {
-  }
-  
-  /** Construct a balancer from the given configuration */
-  Balancer(Configuration conf) throws UnsupportedActionException {
-    checkReplicationPolicyCompatibility(conf);
-    setConf(conf);
-  } 
-
-  /** Construct a balancer from the given configuration and threshold */
-  Balancer(Configuration conf, double threshold) throws UnsupportedActionException {
-    checkReplicationPolicyCompatibility(conf);
-    setConf(conf);
-    this.threshold = threshold;
-  }
 
   /**
-   * Run a balancer
-   * @param args
-   */
-  public static void main(String[] args) {
-    try {
-      System.exit( ToolRunner.run(null, new Balancer(), args) );
-    } catch (Throwable e) {
-      LOG.error(StringUtils.stringifyException(e));
-      System.exit(-1);
-    }
-
-  }
-
-  private static void printUsage() {
-    System.out.println("Usage: java Balancer");
-    System.out.println("          [-threshold <threshold>]\t" 
-        +"percentage of disk capacity");
-  }
-
-  /* parse argument to get the threshold */
-  private double parseArgs(String[] args) {
-    double threshold=0;
-    int argsLen = (args == null) ? 0 : args.length;
-    if (argsLen==0) {
-      threshold = 10;
-    } else {
-      if (argsLen != 2 || !"-threshold".equalsIgnoreCase(args[0])) {
-        printUsage();
-        throw new IllegalArgumentException(Arrays.toString(args));
-      } else {
-        try {
-          threshold = Double.parseDouble(args[1]);
-          if (threshold < 0 || threshold >100) {
-            throw new NumberFormatException();
-          }
-          LOG.info( "Using a threshold of " + threshold );
-        } catch(NumberFormatException e) {
-          System.err.println(
-              "Expect a double parameter in the range of [0, 100]: "+ args[1]);
-          printUsage();
-          throw e;
-        }
-      }
-    }
-    return threshold;
-  }
-  
-  /* Initialize balancer. It sets the value of the threshold, and 
+   * Construct a balancer.
+   * Initialize balancer. It sets the value of the threshold, and 
    * builds the communication proxies to
    * namenode as a client and a secondary namenode and retry proxies
    * when connection fails.
    */
-  private void init(double threshold) throws IOException {
-    this.threshold = threshold;
-    this.namenode = createNamenode(conf);
-    this.client = DFSClient.createNamenode(conf);
-    this.fs = FileSystem.get(conf);
-    ExportedBlockKeys keys = namenode.getBlockKeys();
-    this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
-    if (isBlockTokenEnabled) {
-      long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
-      long blockTokenLifetime = keys.getTokenLifetime();
-      LOG.info("Block token params received from NN: keyUpdateInterval="
-          + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-          + blockTokenLifetime / (60 * 1000) + " min(s)");
-      this.blockTokenSecretManager = new BlockTokenSecretManager(false,
-          blockKeyUpdateInterval, blockTokenLifetime);
-      this.blockTokenSecretManager.setKeys(keys);
-      /*
-       * Balancer should sync its block keys with NN more frequently than NN
-       * updates its block keys
-       */
-      this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
-      LOG.info("Balancer will update its block keys every "
-          + keyUpdaterInterval / (60 * 1000) + " minute(s)");
-      this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
-      this.shouldRun = true;
-      this.keyupdaterthread.start();
-    }
-  }
-  
-  /**
-   * Periodically updates access keys.
-   */
-  class BlockKeyUpdater implements Runnable {
-
-    public void run() {
-      while (shouldRun) {
-        try {
-          blockTokenSecretManager.setKeys(namenode.getBlockKeys());
-        } catch (Exception e) {
-          LOG.error(StringUtils.stringifyException(e));
-        }
-        try {
-          Thread.sleep(keyUpdaterInterval);
-        } catch (InterruptedException ie) {
-        }
-      }
-    }
-  }
-  
-  /* Build a NamenodeProtocol connection to the namenode and
-   * set up the retry policy */ 
-  private static NamenodeProtocol createNamenode(Configuration conf)
-    throws IOException {
-    InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true);
-    RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
-        5, 200, TimeUnit.MILLISECONDS);
-    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
-      new HashMap<Class<? extends Exception>, RetryPolicy>();
-    RetryPolicy methodPolicy = RetryPolicies.retryByException(
-        timeoutPolicy, exceptionToPolicyMap);
-    Map<String,RetryPolicy> methodNameToPolicyMap =
-        new HashMap<String, RetryPolicy>();
-    methodNameToPolicyMap.put("getBlocks", methodPolicy);
-    methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
-
-    UserGroupInformation ugi;
-    ugi = UserGroupInformation.getCurrentUser();
-
-    return (NamenodeProtocol) RetryProxy.create(
-        NamenodeProtocol.class,
-        RPC.getProxy(NamenodeProtocol.class,
-            NamenodeProtocol.versionID,
-            nameNodeAddr,
-            ugi,
-            conf,
-            NetUtils.getDefaultSocketFactory(conf)),
-        methodNameToPolicyMap);
+  Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
+    this.threshold = p.threshold;
+    this.policy = p.policy;
+    this.nnc = theblockpool;
   }
   
   /* Shuffle datanode array */
@@ -946,13 +784,6 @@ public class Balancer implements Tool {
     }
   }
   
-  /* get all live datanodes of a cluster and their disk usage
-   * decide the number of bytes need to be moved
-   */
-  private long initNodes() throws IOException {
-    return initNodes(client.getDatanodeReport(DatanodeReportType.LIVE));
-  }
-  
   /* Given a data node set, build a network topology and decide
    * over-utilized datanodes, above average utilized datanodes, 
    * below average utilized datanodes, and underutilized datanodes. 
@@ -968,15 +799,13 @@ public class Balancer implements Tool {
    */
   private long initNodes(DatanodeInfo[] datanodes) {
     // compute average utilization
-    long totalCapacity=0L, totalUsedSpace=0L;
     for (DatanodeInfo datanode : datanodes) {
       if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
         continue; // ignore decommissioning or decommissioned nodes
       }
-      totalCapacity += datanode.getCapacity();
-      totalUsedSpace += datanode.getDfsUsed();
+      policy.accumulateSpaces(datanode);
     }
-    this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
+    policy.initAvgUtilization();
 
     /*create network topology and all data node lists: 
      * overloaded, above-average, below-average, and underloaded
@@ -991,19 +820,20 @@ public class Balancer implements Tool {
       }
       cluster.add(datanode);
       BalancerDatanode datanodeS;
-      if (getUtilization(datanode) > avgUtilization) {
-        datanodeS = new Source(datanode, avgUtilization, threshold);
+      final double avg = policy.getAvgUtilization();
+      if (policy.getUtilization(datanode) > avg) {
+        datanodeS = new Source(datanode, policy, threshold);
         if (isAboveAvgUtilized(datanodeS)) {
           this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
         } else {
           assert(isOverUtilized(datanodeS)) :
             datanodeS.getName()+ "is not an overUtilized node";
           this.overUtilizedDatanodes.add((Source)datanodeS);
-          overLoadedBytes += (long)((datanodeS.utilization-avgUtilization
+          overLoadedBytes += (long)((datanodeS.utilization-avg
               -threshold)*datanodeS.datanode.getCapacity()/100.0);
         }
       } else {
-        datanodeS = new BalancerDatanode(datanode, avgUtilization, threshold);
+        datanodeS = new BalancerDatanode(datanode, policy, threshold);
         if ( isBelowOrEqualAvgUtilized(datanodeS)) {
           this.belowAvgUtilizedDatanodes.add(datanodeS);
         } else {
@@ -1011,7 +841,7 @@ public class Balancer implements Tool {
               + datanodeS.getName() + ")=" + isUnderUtilized(datanodeS)
               + ", utilization=" + datanodeS.utilization; 
           this.underUtilizedDatanodes.add(datanodeS);
-          underLoadedBytes += (long)((avgUtilization-threshold-
+          underLoadedBytes += (long)((avg-threshold-
               datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
         }
       }
@@ -1019,7 +849,7 @@ public class Balancer implements Tool {
     }
 
     //logging
-    logImbalancedNodes();
+    logNodes();
     
     assert (this.datanodes.size() == 
       overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
@@ -1031,25 +861,20 @@ public class Balancer implements Tool {
   }
 
   /* log the over utilized & under utilized nodes */
-  private void logImbalancedNodes() {
-    StringBuilder msg = new StringBuilder();
-    msg.append(overUtilizedDatanodes.size());
-    msg.append(" over utilized nodes:");
-    for (Source node : overUtilizedDatanodes) {
-      msg.append( " " );
-      msg.append( node.getName() );
-    }
-    LOG.info(msg);
-    msg = new StringBuilder();
-    msg.append(underUtilizedDatanodes.size());
-    msg.append(" under utilized nodes: ");
-    for (BalancerDatanode node : underUtilizedDatanodes) {
-      msg.append( " " );
-      msg.append( node.getName() );
-    }
-    LOG.info(msg);
+  private void logNodes() {
+    logNodes("over-utilized", overUtilizedDatanodes);
+    if (LOG.isTraceEnabled()) {
+      logNodes("above-average", aboveAvgUtilizedDatanodes);
+      logNodes("below-average", belowAvgUtilizedDatanodes);
+    }
+    logNodes("underutilized", underUtilizedDatanodes);
   }
-  
+
+  private static <T extends BalancerDatanode> void logNodes(
+      String name, Collection<T> nodes) {
+    LOG.info(nodes.size() + " " + name + ": " + nodes);
+  }
+
   /* Decide all <source, target> pairs and
    * the number of bytes to move from a source to a target
    * Maximum bytes to be moved per node is
@@ -1313,7 +1138,6 @@ public class Balancer implements Tool {
    */ 
   private static class MovedBlocks {
     private long lastCleanupTime = System.currentTimeMillis();
-    private static long winWidth = 5400*1000L; // 1.5 hour
     final private static int CUR_WIN = 0;
     final private static int OLD_WIN = 1;
     final private static int NUM_WINS = 2;
@@ -1326,13 +1150,6 @@ public class Balancer implements Tool {
       movedBlocks.add(new HashMap<Block,BalancerBlock>());
     }
 
-    /* set the win width */
-    private void setWinWidth(Configuration conf) {
-      winWidth = conf.getLong(
-          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 
-          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
-    }
-    
     /* add a block thus marking a block to be moved */
     synchronized private void add(BalancerBlock block) {
       movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
@@ -1353,7 +1170,7 @@ public class Balancer implements Tool {
     synchronized private void cleanup() {
       long curTime = System.currentTimeMillis();
       // check if old win is older than winWidth
-      if (lastCleanupTime + winWidth <= curTime) {
+      if (lastCleanupTime + WIN_WIDTH <= curTime) {
         // purge the old window
         movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
         movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
@@ -1419,7 +1236,7 @@ public class Balancer implements Tool {
     this.datanodes.clear();
     this.sources.clear();
     this.targets.clear();  
-    this.avgUtilization = 0.0D;
+    this.policy.reset();
     cleanGlobalBlockList();
     this.movedBlocks.cleanup();
   }
@@ -1439,182 +1256,172 @@ public class Balancer implements Tool {
   
   /* Return true if the given datanode is overUtilized */
   private boolean isOverUtilized(BalancerDatanode datanode) {
-    return datanode.utilization > (avgUtilization+threshold);
+    return datanode.utilization > (policy.getAvgUtilization()+threshold);
   }
   
   /* Return true if the given datanode is above average utilized
    * but not overUtilized */
   private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
-    return (datanode.utilization <= (avgUtilization+threshold))
-        && (datanode.utilization > avgUtilization);
+    final double avg = policy.getAvgUtilization();
+    return (datanode.utilization <= (avg+threshold))
+        && (datanode.utilization > avg);
   }
   
   /* Return true if the given datanode is underUtilized */
   private boolean isUnderUtilized(BalancerDatanode datanode) {
-    return datanode.utilization < (avgUtilization-threshold);
+    return datanode.utilization < (policy.getAvgUtilization()-threshold);
   }
 
   /* Return true if the given datanode is below average utilized 
    * but not underUtilized */
   private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) {
-        return (datanode.utilization >= (avgUtilization-threshold))
-                 && (datanode.utilization <= avgUtilization);
+    final double avg = policy.getAvgUtilization();
+    return (datanode.utilization >= (avg-threshold))
+             && (datanode.utilization <= avg);
   }
 
   // Exit status
-  final public static int SUCCESS = 1;
-  final public static int ALREADY_RUNNING = -1;
-  final public static int NO_MOVE_BLOCK = -2;
-  final public static int NO_MOVE_PROGRESS = -3;
-  final public static int IO_EXCEPTION = -4;
-  final public static int ILLEGAL_ARGS = -5;
-  /** main method of Balancer
-   * @param args arguments to a Balancer
-   * @throws Exception exception that occured during datanode balancing
-   */
-  public int run(String[] args) throws Exception {
-    long startTime = Util.now();
-    OutputStream out = null;
+  enum ReturnStatus {
+    SUCCESS(1),
+    IN_PROGRESS(0),
+    ALREADY_RUNNING(-1),
+    NO_MOVE_BLOCK(-2),
+    NO_MOVE_PROGRESS(-3),
+    IO_EXCEPTION(-4),
+    ILLEGAL_ARGS(-5),
+    INTERRUPTED(-6);
+
+    final int code;
+
+    ReturnStatus(int code) {
+      this.code = code;
+    }
+  }
+
+  /** Run an iteration for all datanodes. */
+  private ReturnStatus run(int iteration, Formatter formatter) {
     try {
-      // initialize a balancer
-      init(parseArgs(args));
+      /* get all live datanodes of a cluster and their disk usage
+       * decide the number of bytes need to be moved
+       */
+      final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
+      if (bytesLeftToMove == 0) {
+        System.out.println("The cluster is balanced. Exiting...");
+        return ReturnStatus.SUCCESS;
+      } else {
+        LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
+            + " to make the cluster balanced." );
+      }
       
-      /* Check if there is another balancer running.
-       * Exit if there is another one running.
+      /* Decide all the nodes that will participate in the block move and
+       * the number of bytes that need to be moved from one node to another
+       * in this iteration. Maximum bytes to be moved per node is
+       * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
        */
-      out = checkAndMarkRunningBalancer(); 
-      if (out == null) {
-        System.out.println("Another balancer is running. Exiting...");
-        return ALREADY_RUNNING;
+      final long bytesToMove = chooseNodes();
+      if (bytesToMove == 0) {
+        System.out.println("No block can be moved. Exiting...");
+        return ReturnStatus.NO_MOVE_BLOCK;
+      } else {
+        LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
+            " in this iteration");
       }
 
-      Formatter formatter = new Formatter(System.out);
-      System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
-      int iterations = 0;
-      while (true ) {
-        /* get all live datanodes of a cluster and their disk usage
-         * decide the number of bytes need to be moved
-         */
-        long bytesLeftToMove = initNodes();
-        if (bytesLeftToMove == 0) {
-          System.out.println("The cluster is balanced. Exiting...");
-          return SUCCESS;
-        } else {
-          LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
-              +" bytes to make the cluster balanced." );
-        }
-        
-        /* Decide all the nodes that will participate in the block move and
-         * the number of bytes that need to be moved from one node to another
-         * in this iteration. Maximum bytes to be moved per node is
-         * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
-         */
-        long bytesToMove = chooseNodes();
-        if (bytesToMove == 0) {
-          System.out.println("No block can be moved. Exiting...");
-          return NO_MOVE_BLOCK;
-        } else {
-          LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
-              "bytes in this iteration");
-        }
-   
-        formatter.format("%-24s %10d  %19s  %18s  %17s\n", 
-            DateFormat.getDateTimeInstance().format(new Date()),
-            iterations,
-            StringUtils.byteDesc(bytesMoved.get()),
-            StringUtils.byteDesc(bytesLeftToMove),
-            StringUtils.byteDesc(bytesToMove)
-            );
-        
-        /* For each pair of <source, target>, start a thread that repeatedly 
-         * decide a block to be moved and its proxy source, 
-         * then initiates the move until all bytes are moved or no more block
-         * available to move.
-         * Exit no byte has been moved for 5 consecutive iterations.
-         */
-        if (dispatchBlockMoves() > 0) {
-          notChangedIterations = 0;
-        } else {
-          notChangedIterations++;
-          if (notChangedIterations >= 5) {
-            System.out.println(
-                "No block has been moved for 5 iterations. Exiting...");
-            return NO_MOVE_PROGRESS;
-          }
-        }
-
-        // clean all lists
-        resetData();
-        
-        try {
-          Thread.sleep(2000*conf.getLong(
-              DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
-              DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT));
-        } catch (InterruptedException ignored) {
+      formatter.format("%-24s %10d  %19s  %18s  %17s\n", 
+          DateFormat.getDateTimeInstance().format(new Date()),
+          iteration,
+          StringUtils.byteDesc(bytesMoved.get()),
+          StringUtils.byteDesc(bytesLeftToMove),
+          StringUtils.byteDesc(bytesToMove)
+          );
+      
+      /* For each pair of <source, target>, start a thread that repeatedly 
+       * decide a block to be moved and its proxy source, 
+       * then initiates the move until all bytes are moved or no more block
+       * available to move.
+       * Exit no byte has been moved for 5 consecutive iterations.
+       */
+      if (dispatchBlockMoves() > 0) {
+        notChangedIterations = 0;
+      } else {
+        notChangedIterations++;
+        if (notChangedIterations >= 5) {
+          System.out.println(
+              "No block has been moved for 5 iterations. Exiting...");
+          return ReturnStatus.NO_MOVE_PROGRESS;
         }
-        
-        iterations++;
       }
-    } catch (IllegalArgumentException ae) {
-      return ILLEGAL_ARGS;
+
+      // clean all lists
+      resetData();
+      return ReturnStatus.IN_PROGRESS;
+    } catch (IllegalArgumentException e) {
+      System.out.println(e + ".  Exiting ...");
+      return ReturnStatus.ILLEGAL_ARGS;
     } catch (IOException e) {
-      System.out.println("Received an IO exception: " + e.getMessage() +
-          " . Exiting...");
-      return IO_EXCEPTION;
+      System.out.println(e + ".  Exiting ...");
+      return ReturnStatus.IO_EXCEPTION;
+    } catch (InterruptedException e) {
+      System.out.println(e + ".  Exiting ...");
+      return ReturnStatus.INTERRUPTED;
     } finally {
       // shutdown thread pools
       dispatcherExecutor.shutdownNow();
       moverExecutor.shutdownNow();
-
-      shouldRun = false;
-      try {
-        if (keyupdaterthread != null) keyupdaterthread.interrupt();
-      } catch (Exception e) {
-        LOG.warn("Exception shutting down access key updater thread", e);
-      }
-      // close the output file
-      IOUtils.closeStream(out); 
-      if (fs != null) {
-        try {
-          fs.delete(BALANCER_ID_PATH, true);
-        } catch(IOException ignored) {
-        }
-      }
-      System.out.println("Balancing took " + 
-          time2Str(Util.now()-startTime));
     }
   }
 
-  private Path BALANCER_ID_PATH = new Path("/system/balancer.id");
-  /* The idea for making sure that there is no more than one balancer
-   * running in an HDFS is to create a file in the HDFS, writes the IP address
-   * of the machine on which the balancer is running to the file, but did not
-   * close the file until the balancer exits. 
-   * This prevents the second balancer from running because it can not
-   * creates the file while the first one is running.
-   * 
-   * This method checks if there is any running balancer and 
-   * if no, mark yes if no.
-   * Note that this is an atomic operation.
-   * 
-   * Return null if there is a running balancer; otherwise the output stream
-   * to the newly created file.
+  /**
+   * Balance all namenodes.
+   * For each iteration,
+   * for each namenode,
+   * execute a {@link Balancer} to work through all datanodes once.  
    */
-  private OutputStream checkAndMarkRunningBalancer() throws IOException {
+  static int run(List<InetSocketAddress> namenodes, final Parameters p,
+      Configuration conf) throws IOException, InterruptedException {
+    final long sleeptime = 2000*conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+    LOG.info("namenodes = " + namenodes);
+    LOG.info("p         = " + p);
+    
+    final Formatter formatter = new Formatter(System.out);
+    System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
+    
+    final List<NameNodeConnector> connectors
+        = new ArrayList<NameNodeConnector>(namenodes.size());
     try {
-      DataOutputStream out = fs.create(BALANCER_ID_PATH);
-      out. writeBytes(InetAddress.getLocalHost().getHostName());
-      out.flush();
-      return out;
-    } catch(RemoteException e) {
-      if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
-        return null;
-      } else {
-        throw e;
+      for(InetSocketAddress isa : namenodes) {
+        connectors.add(new NameNodeConnector(isa, conf));
+      }
+    
+      boolean done = false;
+      for(int iteration = 0; !done; iteration++) {
+        done = true;
+        Collections.shuffle(connectors);
+        for(NameNodeConnector nnc : connectors) {
+          final Balancer b = new Balancer(nnc, p, conf);
+          final ReturnStatus r = b.run(iteration, formatter);
+          if (r == ReturnStatus.IN_PROGRESS) {
+            done = false;
+          } else if (r != ReturnStatus.SUCCESS) {
+            //must be an error statue, return.
+            return r.code;
+          }
+        }
+
+        if (!done) {
+          Thread.sleep(sleeptime);
+        }
+      }
+    } finally {
+      for(NameNodeConnector nnc : connectors) {
+        nnc.close();
       }
     }
+    return ReturnStatus.SUCCESS.code;
   }
-  
+
   /* Given elaspedTime in ms, return a printable string */
   private static String time2Str(long elapsedTime) {
     String unit;
@@ -1635,15 +1442,116 @@ public class Balancer implements Tool {
     return time+" "+unit;
   }
 
-  /** return this balancer's configuration */
-  public Configuration getConf() {
-    return conf;
+  static class Parameters {
+    static final Parameters DEFALUT = new Parameters(
+        BalancingPolicy.Node.INSTANCE, 10.0);
+
+    final BalancingPolicy policy;
+    final double threshold;
+
+    Parameters(BalancingPolicy policy, double threshold) {
+      this.policy = policy;
+      this.threshold = threshold;
+    }
+
+    @Override
+    public String toString() {
+      return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
+          + "[" + policy + ", threshold=" + threshold + "]";
+    }
   }
 
-  /** set this balancer's configuration */
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-    movedBlocks.setWinWidth(conf);
+  static class Cli extends Configured implements Tool {
+    /** Parse arguments and then run Balancer */
+    @Override
+    public int run(String[] args) {
+      final long startTime = Util.now();
+      final Configuration conf = getConf();
+      WIN_WIDTH = conf.getLong(
+          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 
+          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
+
+      try {
+        checkReplicationPolicyCompatibility(conf);
+
+        final List<InetSocketAddress> namenodes = DFSUtil.getNNServiceRpcAddresses(conf);
+        return Balancer.run(namenodes, parse(args), conf);
+      } catch (IOException e) {
+        System.out.println(e + ".  Exiting ...");
+        return ReturnStatus.IO_EXCEPTION.code;
+      } catch (InterruptedException e) {
+        System.out.println(e + ".  Exiting ...");
+        return ReturnStatus.INTERRUPTED.code;
+      } finally {
+        System.out.println("Balancing took " + time2Str(Util.now()-startTime));
+      }
+    }
+
+    /** parse command line arguments */
+    static Parameters parse(String[] args) {
+      BalancingPolicy policy = Parameters.DEFALUT.policy;
+      double threshold = Parameters.DEFALUT.threshold;
+
+      if (args != null) {
+        try {
+          for(int i = 0; i < args.length; i++) {
+            if ("-threshold".equalsIgnoreCase(args[i])) {
+              i++;
+              try {
+                threshold = Double.parseDouble(args[i]);
+                if (threshold < 0 || threshold > 100) {
+                  throw new NumberFormatException(
+                      "Number out of range: threshold = " + threshold);
+                }
+                LOG.info( "Using a threshold of " + threshold );
+              } catch(NumberFormatException e) {
+                System.err.println(
+                    "Expecting a number in the range of [0.0, 100.0]: "
+                    + args[i]);
+                throw e;
+              }
+            } else if ("-policy".equalsIgnoreCase(args[i])) {
+              i++;
+              try {
+                policy = BalancingPolicy.parse(args[i]);
+              } catch(IllegalArgumentException e) {
+                System.err.println("Illegal policy name: " + args[i]);
+                throw e;
+              }
+            } else {
+              throw new IllegalArgumentException("args = "
+                  + Arrays.toString(args));
+            }
+          }
+        } catch(RuntimeException e) {
+          printUsage();
+          throw e;
+        }
+      }
+      
+      return new Parameters(policy, threshold);
+    }
+
+    private static void printUsage() {
+      System.out.println("Usage: java " + Balancer.class.getSimpleName());
+      System.out.println("    [-policy <policy>]\tthe balancing policy: "
+          + BalancingPolicy.Node.INSTANCE.getName() + " or " 
+          + BalancingPolicy.Pool.INSTANCE.getName());
+      System.out.println(
+          "    [-threshold <threshold>]\tPercentage of disk capacity");
+    }
   }
 
+  /**
+   * Run a balancer
+   * @param args Command line arguments
+   */
+  public static void main(String[] args) {
+    try {
+      System.exit(ToolRunner.run(null, new Cli(), args));
+    } catch (Throwable e) {
+      LOG.error(StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Thu May  5 05:40:07 2011
@@ -41,6 +41,8 @@ public interface HdfsConstants {
   /** Startup options */
   static public enum StartupOption{
     FORMAT  ("-format"),
+    CLUSTERID ("-clusterid"),
+    GENCLUSTERID ("-genclusterid"),
     REGULAR ("-regular"),
     BACKUP  ("-backup"),
     CHECKPOINT("-checkpoint"),
@@ -50,6 +52,10 @@ public interface HdfsConstants {
     IMPORT  ("-importCheckpoint");
     
     private String name = null;
+    
+    // Used only with format and upgrade options
+    private String clusterId = null;
+    
     private StartupOption(String arg) {this.name = arg;}
     public String getName() {return name;}
     public NamenodeRole toNodeRole() {
@@ -62,7 +68,14 @@ public interface HdfsConstants {
         return NamenodeRole.ACTIVE;
       }
     }
-
+    
+    public void setClusterId(String cid) {
+      clusterId = cid;
+    }
+    
+    public String getClusterId() {
+      return clusterId;
+    }
   }
 
   // Timeouts for communicating with DataNode for streaming writes/reads

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu May  5 05:40:07 2011
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.Random;
 import java.util.TreeSet;
 
+import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.jsp.JspWriter;
 
@@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.BlockReade
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -66,6 +68,7 @@ public class JspHelper {
   public static final String CURRENT_CONF = "current.conf";
   final static public String WEB_UGI_PROPERTY_NAME = "dfs.web.ugi";
   public static final String DELEGATION_PARAMETER_NAME = "delegation";
+  public static final String NAMENODE_ADDRESS = "nnaddr";
   static final String SET_DELEGATION = "&" + DELEGATION_PARAMETER_NAME +
                                               "=";
   private static final Log LOG = LogFactory.getLog(JspHelper.class);
@@ -181,7 +184,7 @@ public class JspHelper {
     return chosenNode;
   }
 
-  public static void streamBlockInAscii(InetSocketAddress addr, 
+  public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
       JspWriter out, Configuration conf) throws IOException {
@@ -193,9 +196,9 @@ public class JspHelper {
       long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);     
       
       // Use the block name for file name. 
-      String file = BlockReader.getFileName(addr, blockId);
+      String file = BlockReader.getFileName(addr, poolId, blockId);
       BlockReader blockReader = BlockReader.newBlockReader(s, file,
-        new Block(blockId, 0, genStamp), blockToken,
+        new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
         offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
         
     byte[] buf = new byte[(int)amtToRead];
@@ -359,14 +362,16 @@ public class JspHelper {
 
   public static void printPathWithLinks(String dir, JspWriter out, 
                                         int namenodeInfoPort,
-                                        String tokenString
+                                        String tokenString,
+                                        String nnAddress
                                         ) throws IOException {
     try {
       String[] parts = dir.split(Path.SEPARATOR);
       StringBuilder tempPath = new StringBuilder(dir.length());
       out.print("<a href=\"browseDirectory.jsp" + "?dir="+ Path.SEPARATOR
           + "&namenodeInfoPort=" + namenodeInfoPort
-          + getDelegationTokenUrlParam(tokenString) + "\">" + Path.SEPARATOR
+          + getDelegationTokenUrlParam(tokenString) 
+          + getUrlParam(NAMENODE_ADDRESS, nnAddress) + "\">" + Path.SEPARATOR
           + "</a>");
       tempPath.append(Path.SEPARATOR);
       for (int i = 0; i < parts.length-1; i++) {
@@ -374,7 +379,8 @@ public class JspHelper {
           tempPath.append(parts[i]);
           out.print("<a href=\"browseDirectory.jsp" + "?dir="
               + tempPath.toString() + "&namenodeInfoPort=" + namenodeInfoPort
-              + getDelegationTokenUrlParam(tokenString));
+              + getDelegationTokenUrlParam(tokenString)
+              + getUrlParam(NAMENODE_ADDRESS, nnAddress));
           out.print("\">" + parts[i] + "</a>" + Path.SEPARATOR);
           tempPath.append(Path.SEPARATOR);
         }
@@ -391,7 +397,8 @@ public class JspHelper {
   public static void printGotoForm(JspWriter out,
                                    int namenodeInfoPort,
                                    String tokenString,
-                                   String file) throws IOException {
+                                   String file,
+                                   String nnAddress) throws IOException {
     out.print("<form action=\"browseDirectory.jsp\" method=\"get\" name=\"goto\">");
     out.print("Goto : ");
     out.print("<input name=\"dir\" type=\"text\" width=\"50\" id\"dir\" value=\""+ file+"\">");
@@ -402,6 +409,8 @@ public class JspHelper {
       out.print("<input name=\"" + DELEGATION_PARAMETER_NAME
           + "\" type=\"hidden\" value=\"" + tokenString + "\">");
     }
+    out.print("<input name=\""+ NAMENODE_ADDRESS +"\" type=\"hidden\" "
+        + "value=\"" + nnAddress  + "\">");
     out.print("</form>");
   }
   
@@ -475,16 +484,43 @@ public class JspHelper {
     return UserGroupInformation.createRemoteUser(strings[0]);
   }
 
+  private static String getNNServiceAddress(ServletContext context,
+      HttpServletRequest request) {
+    String namenodeAddressInUrl = request.getParameter(NAMENODE_ADDRESS);
+    InetSocketAddress namenodeAddress = null;
+    if (namenodeAddressInUrl != null) {
+      namenodeAddress = DFSUtil.getSocketAddress(namenodeAddressInUrl);
+    } else if (context != null) {
+      namenodeAddress = (InetSocketAddress) context
+          .getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
+    }
+    if (namenodeAddress != null) {
+      return (namenodeAddress.getAddress().getHostAddress() + ":" 
+          + namenodeAddress.getPort());
+    }
+    return null;
+  }
+
+  /**
+   * See
+   * {@link JspHelper#getUGI(ServletContext, HttpServletRequest, Configuration)}
+   * , ServletContext is passed as null.
+   */
+  public static UserGroupInformation getUGI(HttpServletRequest request,
+      Configuration conf) throws IOException {
+    return getUGI(null, request, conf);
+  }
+  
   /**
    * Get {@link UserGroupInformation} and possibly the delegation token out of
    * the request.
+   * @param context the ServletContext that is serving this request.
    * @param request the http request
    * @return a new user from the request
    * @throws AccessControlException if the request has no token
    */
-  public static UserGroupInformation getUGI(HttpServletRequest request,
-                                            Configuration conf
-                                           ) throws IOException {
+  public static UserGroupInformation getUGI(ServletContext context,
+      HttpServletRequest request, Configuration conf) throws IOException {
     UserGroupInformation ugi = null;
     if(UserGroupInformation.isSecurityEnabled()) {
       String user = request.getRemoteUser();
@@ -493,12 +529,12 @@ public class JspHelper {
         Token<DelegationTokenIdentifier> token = 
           new Token<DelegationTokenIdentifier>();
         token.decodeFromUrlString(tokenString);
-        InetSocketAddress serviceAddr = NameNode.getAddress(conf);
-        LOG.info("Setting service in token: "
-            + new Text(serviceAddr.getAddress().getHostAddress() + ":"
-                + serviceAddr.getPort()));
-        token.setService(new Text(serviceAddr.getAddress().getHostAddress()
-            + ":" + serviceAddr.getPort()));
+        String serviceAddress = getNNServiceAddress(context, request);
+        if (serviceAddress != null) {
+          LOG.info("Setting service in token: "
+              + new Text(serviceAddress));
+          token.setService(new Text(serviceAddress));
+        }
         ByteArrayInputStream buf = new ByteArrayInputStream(token
             .getIdentifier());
         DataInputStream in = new DataInputStream(buf);
@@ -549,5 +585,13 @@ public class JspHelper {
     }
   }
 
-
+  /**
+   * Returns the url parameter for the given bpid string.
+   * @param name parameter name
+   * @param val parameter value
+   * @return url parameter
+   */
+  public static String getUrlParam(String name, String val) {
+    return val == null ? "" : "&" + name + "=" + val;
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Thu May  5 05:40:07 2011
@@ -78,18 +78,21 @@ public abstract class Storage extends St
   // last layout version that did not support persistent rbw replicas
   public static final int PRE_RBW_LAYOUT_VERSION = -19;
   
+  // last layout version that is before federation
+  public static final int LAST_PRE_FEDERATION_LAYOUT_VERSION = -30;
+  
   /** Layout versions of 0.20.203 release */
   public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
 
   private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";
   public    static final String STORAGE_DIR_CURRENT   = "current";
-  private   static final String STORAGE_DIR_PREVIOUS  = "previous";
-  private   static final String STORAGE_TMP_REMOVED   = "removed.tmp";
-  private   static final String STORAGE_TMP_PREVIOUS  = "previous.tmp";
-  private   static final String STORAGE_TMP_FINALIZED = "finalized.tmp";
-  private   static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
-  private   static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
+  public    static final String STORAGE_DIR_PREVIOUS  = "previous";
+  public    static final String STORAGE_TMP_REMOVED   = "removed.tmp";
+  public    static final String STORAGE_TMP_PREVIOUS  = "previous.tmp";
+  public    static final String STORAGE_TMP_FINALIZED = "finalized.tmp";
+  public    static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
+  public    static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
   
   public enum StorageState {
     NON_EXISTENT,
@@ -115,7 +118,7 @@ public abstract class Storage extends St
     public boolean isOfType(StorageDirType type);
   }
   
-  private NodeType storageType;    // Type of the node using this storage 
+  protected NodeType storageType;    // Type of the node using this storage 
   protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
   
   private class DirIterator implements Iterator<StorageDirectory> {
@@ -208,19 +211,32 @@ public abstract class Storage extends St
    */
   @InterfaceAudience.Private
   public class StorageDirectory {
-    File              root; // root directory
-    FileLock          lock; // storage lock
-    StorageDirType dirType; // storage dir type
+    final File root;              // root directory
+    final boolean useLock;        // flag to enable storage lock
+    final StorageDirType dirType; // storage dir type
+    FileLock lock;                // storage lock
     
     public StorageDirectory(File dir) {
       // default dirType is null
-      this(dir, null);
+      this(dir, null, true);
     }
     
     public StorageDirectory(File dir, StorageDirType dirType) {
+      this(dir, dirType, true);
+    }
+    
+    /**
+     * Constructor
+     * @param dir directory corresponding to the storage
+     * @param dirType storage directory type
+     * @param useLock true - enables locking on the storage directory and false
+     *          disables locking
+     */
+    public StorageDirectory(File dir, StorageDirType dirType, boolean useLock) {
       this.root = dir;
       this.lock = null;
       this.dirType = dirType;
+      this.useLock = useLock;
     }
     
     /**
@@ -245,22 +261,26 @@ public abstract class Storage extends St
     public void read() throws IOException {
       read(getVersionFile());
     }
-    
     public void read(File from) throws IOException {
+      Properties props = readFrom(from);
+      getFields(props, this);
+    }
+    
+    public Properties readFrom(File from) throws IOException {
       RandomAccessFile file = new RandomAccessFile(from, "rws");
       FileInputStream in = null;
+      Properties props = new Properties();
       try {
         in = new FileInputStream(file.getFD());
         file.seek(0);
-        Properties props = new Properties();
         props.load(in);
-        getFields(props, this);
       } finally {
         if (in != null) {
           in.close();
         }
         file.close();
       }
+      return props;
     }
 
     /**
@@ -620,6 +640,10 @@ public abstract class Storage extends St
      * @throws IOException if locking fails
      */
     public void lock() throws IOException {
+      if (!useLock) {
+        LOG.info("Locking is disabled");
+        return;
+      }
       this.lock = tryLock();
       if (lock == null) {
         String msg = "Cannot lock storage " + this.root 
@@ -676,11 +700,6 @@ public abstract class Storage extends St
     this.storageType = type;
   }
   
-  protected Storage(NodeType type, int nsID, long cT) {
-    super(FSConstants.LAYOUT_VERSION, nsID, cT);
-    this.storageType = type;
-  }
-  
   protected Storage(NodeType type, StorageInfo storageInfo) {
     super(storageInfo);
     this.storageType = type;
@@ -726,8 +745,9 @@ public abstract class Storage extends St
   public static void checkVersionUpgradable(int oldVersion) 
                                      throws IOException {
     if (oldVersion > LAST_UPGRADABLE_LAYOUT_VERSION) {
-      String msg = "*********** Upgrade is not supported from this older" +
-                   " version of storage to the current version." + 
+      String msg = "*********** Upgrade is not supported from this " +
+                   " older version " + oldVersion + 
+                   " of storage to the current version." + 
                    " Please upgrade to " + LAST_UPGRADABLE_HADOOP_VERSION +
                    " or a later version and then upgrade to current" +
                    " version. Old layout version is " + 
@@ -751,29 +771,11 @@ public abstract class Storage extends St
   protected void getFields(Properties props, 
                            StorageDirectory sd 
                            ) throws IOException {
-    String sv, st, sid, sct;
-    sv = props.getProperty("layoutVersion");
-    st = props.getProperty("storageType");
-    sid = props.getProperty("namespaceID");
-    sct = props.getProperty("cTime");
-    if (sv == null || st == null || sid == null || sct == null)
-      throw new InconsistentFSStateException(sd.root,
-                                             "file " + STORAGE_FILE_VERSION + " is invalid.");
-    int rv = Integer.parseInt(sv);
-    NodeType rt = NodeType.valueOf(st);
-    int rid = Integer.parseInt(sid);
-    long rct = Long.parseLong(sct);
-    if (!storageType.equals(rt) ||
-        !((namespaceID == 0) || (rid == 0) || namespaceID == rid))
-      throw new InconsistentFSStateException(sd.root,
-                                             "is incompatible with others.");
-    if (rv < FSConstants.LAYOUT_VERSION) // future version
-      throw new IncorrectVersionException(rv, "storage directory " 
-                                          + sd.root.getCanonicalPath());
-    layoutVersion = rv;
-    storageType = rt;
-    namespaceID = rid;
-    cTime = rct;
+    setLayoutVersion(props, sd);
+    setNamespaceID(props, sd);
+    setStorageType(props, sd);
+    setcTime(props, sd);
+    setClusterId(props, layoutVersion, sd);
   }
   
   /**
@@ -789,6 +791,10 @@ public abstract class Storage extends St
     props.setProperty("layoutVersion", String.valueOf(layoutVersion));
     props.setProperty("storageType", storageType.toString());
     props.setProperty("namespaceID", String.valueOf(namespaceID));
+    // Set clusterID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
+    if (layoutVersion < LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+      props.setProperty("clusterID", clusterID);
+    }
     props.setProperty("cTime", String.valueOf(cTime));
   }
 
@@ -871,10 +877,74 @@ public abstract class Storage extends St
 
   public static String getRegistrationID(StorageInfo storage) {
     return "NS-" + Integer.toString(storage.getNamespaceID())
+      + "-" + storage.getClusterID()
       + "-" + Integer.toString(storage.getLayoutVersion())
       + "-" + Long.toString(storage.getCTime());
   }
   
+  String getProperty(Properties props, StorageDirectory sd,
+      String name) throws InconsistentFSStateException {
+    String property = props.getProperty(name);
+    if (property == null) {
+      throw new InconsistentFSStateException(sd.root, "file "
+          + STORAGE_FILE_VERSION + " has " + name + " mising.");
+    }
+    return property;
+  }
+  
+  /** Validate and set storage type from {@link Properties}*/
+  protected void setStorageType(Properties props, StorageDirectory sd)
+      throws InconsistentFSStateException {
+    NodeType type = NodeType.valueOf(getProperty(props, sd, "storageType"));
+    if (!storageType.equals(type)) {
+      throw new InconsistentFSStateException(sd.root,
+          "node type is incompatible with others.");
+    }
+    storageType = type;
+  }
+  
+  /** Validate and set ctime from {@link Properties}*/
+  protected void setcTime(Properties props, StorageDirectory sd)
+      throws InconsistentFSStateException {
+    cTime = Long.parseLong(getProperty(props, sd, "cTime"));
+  }
+
+  /** Validate and set clusterId from {@link Properties}*/
+  protected void setClusterId(Properties props, int layoutVersion,
+      StorageDirectory sd) throws InconsistentFSStateException {
+    // No Cluster ID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
+    if (layoutVersion < Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+      String cid = getProperty(props, sd, "clusterID");
+      if (!(clusterID.equals("") || cid.equals("") || clusterID.equals(cid))) {
+        throw new InconsistentFSStateException(sd.getRoot(),
+            "cluster Id is incompatible with others.");
+      }
+      clusterID = cid;
+    }
+  }
+  
+  /** Validate and set layout version from {@link Properties}*/
+  protected void setLayoutVersion(Properties props, StorageDirectory sd)
+      throws IncorrectVersionException, InconsistentFSStateException {
+    int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
+    if (lv < FSConstants.LAYOUT_VERSION) { // future version
+      throw new IncorrectVersionException(lv, "storage directory "
+          + sd.root.getAbsolutePath());
+    }
+    layoutVersion = lv;
+  }
+  
+  /** Validate and set namespaceID version from {@link Properties}*/
+  protected void setNamespaceID(Properties props, StorageDirectory sd)
+      throws InconsistentFSStateException {
+    int nsId = Integer.parseInt(getProperty(props, sd, "namespaceID"));
+    if (namespaceID != 0 && nsId != 0 && namespaceID != nsId) {
+      throw new InconsistentFSStateException(sd.root,
+          "namespaceID is incompatible with others.");
+    }
+    namespaceID = nsId;
+  }
+  
   public static boolean is203LayoutVersion(int layoutVersion) {
     for (int lv203 : LAYOUT_VERSIONS_203) {
       if (lv203 == layoutVersion) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java Thu May  5 05:40:07 2011
@@ -23,7 +23,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Common class for storage information.
@@ -34,14 +34,16 @@ import org.apache.hadoop.io.Writable;
 public class StorageInfo implements Writable {
   public int   layoutVersion;   // layout version of the storage data
   public int   namespaceID;     // id of the file system
+  public String clusterID;      // id of the cluster
   public long  cTime;           // creation time of the file system state
   
   public StorageInfo () {
-    this(0, 0, 0L);
+    this(0, 0, "", 0L);
   }
   
-  public StorageInfo(int layoutV, int nsID, long cT) {
+  public StorageInfo(int layoutV, int nsID, String cid, long cT) {
     layoutVersion = layoutV;
+    clusterID = cid;
     namespaceID = nsID;
     cTime = cT;
   }
@@ -63,13 +65,19 @@ public class StorageInfo implements Writ
   public int    getNamespaceID()  { return namespaceID; }
 
   /**
+   * cluster id of the file system.<p>
+   */
+  public String    getClusterID()  { return clusterID; }
+  
+  /**
    * Creation time of the file system state.<p>
    * Modified during upgrades.
    */
   public long   getCTime()        { return cTime; }
-
+  
   public void   setStorageInfo(StorageInfo from) {
     layoutVersion = from.layoutVersion;
+    clusterID = from.clusterID;
     namespaceID = from.namespaceID;
     cTime = from.cTime;
   }
@@ -80,12 +88,21 @@ public class StorageInfo implements Writ
   public void write(DataOutput out) throws IOException {
     out.writeInt(getLayoutVersion());
     out.writeInt(getNamespaceID());
+    WritableUtils.writeString(out, clusterID);
     out.writeLong(getCTime());
   }
 
   public void readFields(DataInput in) throws IOException {
     layoutVersion = in.readInt();
     namespaceID = in.readInt();
+    clusterID = WritableUtils.readString(in);
     cTime = in.readLong();
   }
+  
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("lv=").append(layoutVersion).append(";cid=").append(clusterID)
+    .append(";nsid=").append(namespaceID).append(";c=").append(cTime);
+    return sb.toString();
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu May  5 05:40:07 2011
@@ -36,14 +36,16 @@ import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.FSOutputSummer;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
@@ -87,14 +89,14 @@ class BlockReceiver implements Closeable
   private final boolean isDatanode;
 
   /** the block to receive */
-  private final Block block; 
+  private final ExtendedBlock block; 
   /** the replica to write */
   private final ReplicaInPipelineInterface replicaInfo;
   /** pipeline stage */
   private final BlockConstructionStage stage;
   private final boolean isTransfer;
 
-  BlockReceiver(final Block block, final DataInputStream in,
+  BlockReceiver(final ExtendedBlock block, final DataInputStream in,
       final String inAddr, final String myAddr,
       final BlockConstructionStage stage, 
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
@@ -145,14 +147,16 @@ class BlockReceiver implements Closeable
         case PIPELINE_SETUP_APPEND:
           replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
           if (datanode.blockScanner != null) { // remove from block scanner
-            datanode.blockScanner.deleteBlock(block);
+            datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
+                block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
           replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
           if (datanode.blockScanner != null) { // remove from block scanner
-            datanode.blockScanner.deleteBlock(block);
+            datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
+                block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
           break;
@@ -267,7 +271,8 @@ class BlockReceiver implements Closeable
    * affect this datanode unless it is caused by interruption.
    */
   private void handleMirrorOutError(IOException ioe) throws IOException {
-    LOG.info(datanode.dnRegistration + ":Exception writing block " +
+    String bpid = block.getBlockPoolId();
+    LOG.info(datanode.getDNRegistrationForBP(bpid) + ":Exception writing block " +
              block + " to mirror " + mirrorAddr + "\n" +
              StringUtils.stringifyException(ioe));
     if (Thread.interrupted()) { // shut down if the thread is interrupted
@@ -286,6 +291,7 @@ class BlockReceiver implements Closeable
   private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
                              byte[] checksumBuf, int checksumOff ) 
                              throws IOException {
+    DatanodeProtocol nn = datanode.getBPNamenode(block.getBlockPoolId());
     while (len > 0) {
       int chunkLen = Math.min(len, bytesPerChecksum);
       
@@ -298,7 +304,7 @@ class BlockReceiver implements Closeable
                       srcDataNode + " to namenode");
             LocatedBlock lb = new LocatedBlock(block, 
                                             new DatanodeInfo[] {srcDataNode});
-            datanode.namenode.reportBadBlocks(new LocatedBlock[] {lb});
+            nn.reportBadBlocks(new LocatedBlock[] {lb});
           } catch (IOException e) {
             LOG.warn("Failed to report bad block " + block + 
                       " from datanode " + srcDataNode + " to namenode");
@@ -974,10 +980,12 @@ class BlockReceiver implements Closeable
               datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
               if (ClientTraceLog.isInfoEnabled() && isClient) {
                 long offset = 0;
+                DatanodeRegistration dnR = 
+                  datanode.getDNRegistrationForBP(block.getBlockPoolId());
                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
                       inAddr, myAddr, block.getNumBytes(),
                       "HDFS_WRITE", clientname, offset,
-                      datanode.dnRegistration.getStorageID(), block, endTime-startTime));
+                      dnR.getStorageID(), block, endTime-startTime));
               } else {
                 LOG.info("Received block " + block + " of size "
                     + block.getNumBytes() + " from " + inAddr);

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu May  5 05:40:07 2011
@@ -31,7 +31,7 @@ import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -47,7 +47,7 @@ class BlockSender implements java.io.Clo
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
-  private Block block; // the block to read from
+  private ExtendedBlock block; // the block to read from
 
   /** the replica to read from */
   private final Replica replica;
@@ -83,21 +83,22 @@ class BlockSender implements java.io.Clo
   private volatile ChunkChecksum lastChunkChecksum = null;
 
   
-  BlockSender(Block block, long startOffset, long length,
+  BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean verifyChecksum, DataNode datanode) throws IOException {
     this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
          verifyChecksum, datanode, null);
   }
 
-  BlockSender(Block block, long startOffset, long length,
+  BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
       throws IOException {
     try {
       this.block = block;
       synchronized(datanode.data) { 
-        this.replica = datanode.data.getReplica(block.getBlockId());
+        this.replica = datanode.data.getReplica(block.getBlockPoolId(), 
+            block.getBlockId());
         if (replica == null) {
           throw new ReplicaNotFoundException(block);
         }
@@ -153,9 +154,8 @@ class BlockSender implements java.io.Clo
       this.clientTraceFmt = clientTraceFmt;
 
       if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
-        checksumIn = new DataInputStream(
-                new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
-                                        BUFFER_SIZE));
+        checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
+            .getMetaDataInputStream(block), BUFFER_SIZE));
 
         // read and handle the common header here. For now just a version
        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
@@ -201,7 +201,8 @@ class BlockSender implements java.io.Clo
           || (length + startOffset) > endOffset) {
         String msg = " Offset " + startOffset + " and length " + length
         + " don't match block " + block + " ( blockLen " + endOffset + " )";
-        LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg);
+        LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) +
+            ":sendBlock() : " + msg);
         throw new IOException(msg);
       }
       

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java Thu May  5 05:40:07 2011
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
@@ -44,7 +46,7 @@ public interface BlockVolumeChoosingPoli
    * @return the chosen volume to store the block.
    * @throws IOException when disks are unavailable or are full.
    */
-  public FSVolume chooseVolume(FSVolume[] volumes, long blockSize)
+  public FSVolume chooseVolume(List<FSVolume> volumes, long blockSize)
     throws IOException;
 
 }



Mime
View raw message