hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1158072 [2/7] - in /hadoop/common/branches/HDFS-1623/hdfs: ./ ivy/ src/c++/libhdfs/ src/contrib/ src/contrib/fuse-dfs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/ser...
Date Tue, 16 Aug 2011 00:37:25 GMT
Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Aug 16 00:37:15 2011
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
@@ -46,14 +44,16 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@@ -61,8 +61,9 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 
 /**
@@ -82,23 +83,13 @@ public class BlockManager {
   private volatile long pendingReplicationBlocksCount = 0L;
   private volatile long corruptReplicaBlocksCount = 0L;
   private volatile long underReplicatedBlocksCount = 0L;
-  public volatile long scheduledReplicationBlocksCount = 0L;
+  private volatile long scheduledReplicationBlocksCount = 0L;
   private volatile long excessBlocksCount = 0L;
   private volatile long pendingDeletionBlocksCount = 0L;
   private boolean isBlockTokenEnabled;
   private long blockKeyUpdateInterval;
   private long blockTokenLifetime;
   private BlockTokenSecretManager blockTokenSecretManager;
-  
-  /** returns the isBlockTokenEnabled - true if block token enabled ,else false */
-  public boolean isBlockTokenEnabled() {
-    return isBlockTokenEnabled;
-  }
-
-  /** get the block key update interval */
-  public long getBlockKeyUpdateInterval() {
-    return blockKeyUpdateInterval;
-  }
 
   /** get the BlockTokenSecretManager */
   public BlockTokenSecretManager getBlockTokenSecretManager() {
@@ -137,10 +128,11 @@ public class BlockManager {
    * Mapping: Block -> { INode, datanodes, self ref }
    * Updated only in response to client-sent information.
    */
-  public final BlocksMap blocksMap;
+  final BlocksMap blocksMap;
 
   private final DatanodeManager datanodeManager;
-  
+  private final HeartbeatManager heartbeatManager;
+
   /** Replication thread. */
   final Daemon replicationThread = new Daemon(new ReplicationMonitor());
   
@@ -173,13 +165,13 @@ public class BlockManager {
   private final PendingReplicationBlocks pendingReplications;
 
   /** The maximum number of replicas allowed for a block */
-  public final int maxReplication;
+  public final short maxReplication;
   /** The maximum number of outgoing replication streams
    *  a given node should have at one time 
    */
-  public int maxReplicationStreams;
+  int maxReplicationStreams;
   /** Minimum copies needed or else write is disallowed */
-  public final int minReplication;
+  public final short minReplication;
   /** Default number of replicas */
   public final int defaultReplication;
   /** The maximum number of entries returned by getCorruptInodes() */
@@ -194,45 +186,11 @@ public class BlockManager {
   /** for block replicas placement */
   private BlockPlacementPolicy blockplacement;
   
-  /**
-   * Get access keys
-   * 
-   * @return current access keys
-   */
-  public ExportedBlockKeys getBlockKeys() {
-    return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
-        : ExportedBlockKeys.DUMMY_KEYS;
-  }
-  
-  /** Generate block token for a LocatedBlock. */
-  public void setBlockToken(LocatedBlock l) throws IOException {
-    Token<BlockTokenIdentifier> token = blockTokenSecretManager.generateToken(l
-        .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
-    l.setBlockToken(token);
-  }
-
-  /** Generate block tokens for the blocks to be returned. */
-  public void setBlockTokens(List<LocatedBlock> locatedBlocks) throws IOException {
-    for(LocatedBlock l : locatedBlocks) {
-      setBlockToken(l);
-    }
-  }
-
-  /**
-   * Update access keys.
-   */
-  public void updateBlockKey() throws IOException {
-    this.blockTokenSecretManager.updateKeys();
-    synchronized (namesystem.heartbeats) {
-      for (DatanodeDescriptor nodeInfo : namesystem.heartbeats) {
-        nodeInfo.needKeyUpdate = true;
-      }
-    }
-  }
-  
   public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
     namesystem = fsn;
-    datanodeManager = new DatanodeManager(fsn, conf);
+    datanodeManager = new DatanodeManager(this, fsn, conf);
+    heartbeatManager = datanodeManager.getHeartbeatManager();
+
     blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
     blockplacement = BlockPlacementPolicy.getInstance(
         conf, namesystem, datanodeManager.getNetworkTopology());
@@ -264,25 +222,28 @@ public class BlockManager {
       DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
     this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
                                           DFSConfigKeys.DFS_REPLICATION_DEFAULT);
-    this.maxReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, 
-                                      DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
-    this.minReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
-                                      DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
-    if (minReplication <= 0)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.namenode.replication.min = "
-                            + minReplication
-                            + " must be greater than 0");
-    if (maxReplication >= (int)Short.MAX_VALUE)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.replication.max = "
-                            + maxReplication + " must be less than " + (Short.MAX_VALUE));
-    if (maxReplication < minReplication)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.namenode.replication.min = "
-                            + minReplication
-                            + " must be less than dfs.replication.max = "
-                            + maxReplication);
+
+    final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, 
+                                 DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
+    final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
+                                 DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
+    if (minR <= 0)
+      throw new IOException("Unexpected configuration parameters: "
+          + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+          + " = " + minR + " <= 0");
+    if (maxR > Short.MAX_VALUE)
+      throw new IOException("Unexpected configuration parameters: "
+          + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+          + " = " + maxR + " > " + Short.MAX_VALUE);
+    if (minR > maxR)
+      throw new IOException("Unexpected configuration parameters: "
+          + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+          + " = " + minR + " > "
+          + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+          + " = " + maxR);
+    this.minReplication = (short)minR;
+    this.maxReplication = (short)maxR;
+
     this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
                                              DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
     this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
@@ -387,6 +348,11 @@ public class BlockManager {
     getDatanodeManager().datanodeDump(out);
   }
 
+  /** @return maxReplicationStreams */
+  public int getMaxReplicationStreams() {
+    return maxReplicationStreams;
+  }
+
   /**
    * @param block
    * @return true if the block has minimum replicas
@@ -413,19 +379,8 @@ public class BlockManager {
       "commitBlock length is less than the stored one "
       + commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
     block.commitBlock(commitBlock);
-    
-    // Adjust disk space consumption if required
-    long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();    
-    if (diff > 0) {
-      try {
-        String path = /* For finding parents */
-        namesystem.leaseManager.findPath(fileINode);
-        namesystem.dir.updateSpaceConsumed(path, 0, -diff
-            * fileINode.getReplication());
-      } catch (IOException e) {
-        LOG.warn("Unexpected exception while updating disk space.", e);
-      }
-    }
+
+    namesystem.updateDiskSpaceConsumed(fileINode, commitBlock);
   }
   
   /**
@@ -527,7 +482,7 @@ public class BlockManager {
     }
 
     long fileLength = fileINode.computeContentSummary().getLength();
-    return getBlockLocation(ucBlock, fileLength - ucBlock.getNumBytes());
+    return createLocatedBlock(ucBlock, fileLength - ucBlock.getNumBytes());
   }
 
   /**
@@ -547,8 +502,9 @@ public class BlockManager {
     return machineSet;
   }
 
-  public List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
-      long length, int nrBlocksToReturn) throws IOException {
+  private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
+      final long offset, final long length, final int nrBlocksToReturn
+      ) throws IOException {
     int curBlk = 0;
     long curPos = 0, blkSize = 0;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
@@ -567,7 +523,7 @@ public class BlockManager {
     long endOff = offset + length;
     List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
     do {
-      results.add(getBlockLocation(blocks[curBlk], curPos));
+      results.add(createLocatedBlock(blocks[curBlk], curPos));
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
@@ -577,7 +533,7 @@ public class BlockManager {
   }
 
   /** @return a LocatedBlock for the given block */
-  public LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
+  private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
       ) throws IOException {
     if (blk instanceof BlockInfoUnderConstruction) {
       if (blk.isComplete()) {
@@ -587,7 +543,8 @@ public class BlockManager {
       }
       final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
       final DatanodeDescriptor[] locations = uc.getExpectedLocations();
-      return namesystem.createLocatedBlock(uc, locations, pos, false);
+      final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
+      return new LocatedBlock(eb, locations, pos, false);
     }
 
     // get block locations
@@ -613,7 +570,78 @@ public class BlockManager {
           machines[j++] = d;
       }
     }
-    return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);
+    final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
+    return new LocatedBlock(eb, machines, pos, isCorrupt);
+  }
+
+  /** Create a LocatedBlocks. */
+  public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
+      final long fileSizeExcludeBlocksUnderConstruction,
+      final boolean isFileUnderConstruction,
+      final long offset, final long length, final boolean needBlockToken
+      ) throws IOException {
+    assert namesystem.hasReadOrWriteLock();
+    if (blocks == null) {
+      return null;
+    } else if (blocks.length == 0) {
+      return new LocatedBlocks(0, isFileUnderConstruction,
+          Collections.<LocatedBlock>emptyList(), null, false);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
+      }
+      final List<LocatedBlock> locatedblocks = createLocatedBlockList(
+          blocks, offset, length, Integer.MAX_VALUE);
+
+      final BlockInfo last = blocks[blocks.length - 1];
+      final long lastPos = last.isComplete()?
+          fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
+          : fileSizeExcludeBlocksUnderConstruction;
+      final LocatedBlock lastlb = createLocatedBlock(last, lastPos);
+
+      if (isBlockTokenEnabled && needBlockToken) {
+        for(LocatedBlock lb : locatedblocks) {
+          setBlockToken(lb, AccessMode.READ);
+        }
+        setBlockToken(lastlb, AccessMode.READ);
+      }
+      return new LocatedBlocks(
+          fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
+          locatedblocks, lastlb, last.isComplete());
+    }
+  }
+
+  /** @return current access keys. */
+  public ExportedBlockKeys getBlockKeys() {
+    return isBlockTokenEnabled? blockTokenSecretManager.exportKeys()
+        : ExportedBlockKeys.DUMMY_KEYS;
+  }
+
+  /** Generate a block token for the located block. */
+  public void setBlockToken(final LocatedBlock b,
+      final BlockTokenSecretManager.AccessMode mode) throws IOException {
+    if (isBlockTokenEnabled) {
+      b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(), 
+          EnumSet.of(mode)));
+    }    
+  }
+
+  void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
+      final DatanodeDescriptor nodeinfo) {
+    // check access key update
+    if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
+      cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
+      nodeinfo.needKeyUpdate = false;
+    }
+  }
+
+  /**
+   * Clamp the specified replication between the minimum and the maximum
+   * replication levels.
+   */
+  public short adjustReplication(short replication) {
+    return replication < minReplication? minReplication
+        : replication > maxReplication? maxReplication: replication;
   }
 
   /**
@@ -642,12 +670,30 @@ public class BlockManager {
                             minReplication);
   }
 
-   /** Get all blocks with location information from a datanode. */
-  public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
+  /**
+   * return a list of blocks & their locations on <code>datanode</code> whose
+   * total size is <code>size</code>
+   * 
+   * @param datanode on which blocks are located
+   * @param size total size of blocks
+   */
+  public BlocksWithLocations getBlocks(DatanodeID datanode, long size
+      ) throws IOException {
+    namesystem.readLock();
+    try {
+      namesystem.checkSuperuserPrivilege();
+      return getBlocksWithLocations(datanode, size);  
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+
+  /** Get all blocks with location information from a datanode. */
+  private BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
       final long size) throws UnregisteredNodeException {
     final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
     if (node == null) {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+      NameNode.stateChangeLog.warn("BLOCK* getBlocks: "
           + "Asking for blocks from an unrecorded node " + datanode.getName());
       throw new HadoopIllegalArgumentException(
           "Datanode " + datanode.getName() + " not found.");
@@ -685,8 +731,8 @@ public class BlockManager {
   }
 
    
-  /** Remove a datanode. */
-  public void removeDatanode(final DatanodeDescriptor node) {
+  /** Remove the blocks associated to the given datanode. */
+  void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
     final Iterator<? extends Block> it = node.getBlockIterator();
     while(it.hasNext()) {
       removeStoredBlock(it.next(), node);
@@ -694,11 +740,6 @@ public class BlockManager {
 
     node.resetBlocks();
     removeFromInvalidates(node.getStorageID());
-    datanodeManager.getNetworkTopology().remove(node);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("remove datanode " + node.getName());
-    }
   }
   
   private void removeFromInvalidates(String storageID, Block block) {
@@ -724,7 +765,7 @@ public class BlockManager {
    * @param dn datanode
    * @param log true to create an entry in the log 
    */
-  void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
+  private void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
     Collection<Block> invalidateSet = recentInvalidateSets
         .get(dn.getStorageID());
     if (invalidateSet == null) {
@@ -734,7 +775,7 @@ public class BlockManager {
     if (invalidateSet.add(b)) {
       pendingDeletionBlocksCount++;
       if (log) {
-        NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
+        NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
             + b + " to " + dn.getName());
       }
     }
@@ -747,7 +788,7 @@ public class BlockManager {
    * @param b block
    * @param dn datanode
    */
-  public void addToInvalidates(Block b, DatanodeInfo dn) {
+  void addToInvalidates(Block b, DatanodeInfo dn) {
     addToInvalidates(b, dn, true);
   }
 
@@ -764,7 +805,7 @@ public class BlockManager {
       datanodes.append(node.getName()).append(" ");
     }
     if (datanodes.length() != 0) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
+      NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
           + b + " to " + datanodes.toString());
     }
   }
@@ -788,20 +829,29 @@ public class BlockManager {
     }
   }
 
-  public void findAndMarkBlockAsCorrupt(Block blk,
-                                 DatanodeInfo dn) throws IOException {
-    BlockInfo storedBlock = getStoredBlock(blk);
-    if (storedBlock == null) {
-      // Check if the replica is in the blockMap, if not
-      // ignore the request for now. This could happen when BlockScanner
-      // thread of Datanode reports bad block before Block reports are sent
-      // by the Datanode on startup
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.markBlockAsCorrupt: " +
-                                   "block " + blk + " could not be marked as " +
-                                   "corrupt as it does not exist in blocksMap");
-      return;
+  /**
+   * Mark the block belonging to datanode as corrupt
+   * @param blk Block to be marked as corrupt
+   * @param dn Datanode which holds the corrupt replica
+   */
+  public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
+      final DatanodeInfo dn) throws IOException {
+    namesystem.writeLock();
+    try {
+      final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
+      if (storedBlock == null) {
+        // Check if the replica is in the blockMap, if not
+        // ignore the request for now. This could happen when BlockScanner
+        // thread of Datanode reports bad block before Block reports are sent
+        // by the Datanode on startup
+        NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: "
+            + blk + " not found.");
+        return;
+      }
+      markBlockAsCorrupt(storedBlock, dn);
+    } finally {
+      namesystem.writeUnlock();
     }
-    markBlockAsCorrupt(storedBlock, dn);
   }
 
   private void markBlockAsCorrupt(BlockInfo storedBlock,
@@ -817,7 +867,7 @@ public class BlockManager {
 
     INodeFile inode = storedBlock.getINode();
     if (inode == null) {
-      NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
+      NameNode.stateChangeLog.info("BLOCK markBlockAsCorrupt: " +
                                    "block " + storedBlock +
                                    " could not be marked as corrupt as it" +
                                    " does not belong to any file");
@@ -844,13 +894,12 @@ public class BlockManager {
    */
   private void invalidateBlock(Block blk, DatanodeInfo dn)
       throws IOException {
-    NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
+    NameNode.stateChangeLog.info("BLOCK* invalidateBlock: "
                                  + blk + " on " + dn.getName());
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
-      throw new IOException("Cannot invalidate block " + blk +
-                            " because datanode " + dn.getName() +
-                            " does not exist.");
+      throw new IOException("Cannot invalidate block " + blk
+          + " because datanode " + dn.getName() + " does not exist.");
     }
 
     // Check how many copies we have of the block. If we have at least one
@@ -860,14 +909,12 @@ public class BlockManager {
       addToInvalidates(blk, dn);
       removeStoredBlock(blk, node);
       if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
-            + blk + " on "
-            + dn.getName() + " listed for deletion.");
+        NameNode.stateChangeLog.debug("BLOCK* invalidateBlocks: "
+            + blk + " on " + dn.getName() + " listed for deletion.");
       }
     } else {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
-          + blk + " on " + dn.getName()
-          + " is the only copy and was not deleted.");
+      NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + blk + " on "
+          + dn.getName() + " is the only copy and was not deleted.");
     }
   }
 
@@ -887,7 +934,7 @@ public class BlockManager {
    * @param nodesToProcess number of datanodes to schedule deletion work
    * @return total number of block for deletion
    */
-  public int computeInvalidateWork(int nodesToProcess) {
+  int computeInvalidateWork(int nodesToProcess) {
     int numOfNodes = recentInvalidateSets.size();
     nodesToProcess = Math.min(numOfNodes, nodesToProcess);
 
@@ -927,7 +974,7 @@ public class BlockManager {
    *
    * @return number of blocks scheduled for replication during this iteration.
    */
-  public int computeReplicationWork(int blocksToProcess) throws IOException {
+  private int computeReplicationWork(int blocksToProcess) throws IOException {
     // Choose the blocks to be replicated
     List<List<Block>> blocksToReplicate =
       chooseUnderReplicatedBlocks(blocksToProcess);
@@ -1174,12 +1221,13 @@ public class BlockManager {
     final DatanodeDescriptor targets[] = blockplacement.chooseTarget(
         src, numOfReplicas, client, excludedNodes, blocksize);
     if (targets.length < minReplication) {
-      throw new IOException("File " + src + " could only be replicated to " +
-                            targets.length + " nodes, instead of " +
-                            minReplication + ". There are "
-                            + getDatanodeManager().getNetworkTopology().getNumOfLeaves()
-                            + " datanode(s) running but "+excludedNodes.size() +
-                            " node(s) are excluded in this operation.");
+      throw new IOException("File " + src + " could only be replicated to "
+          + targets.length + " nodes instead of minReplication (="
+          + minReplication + ").  There are "
+          + getDatanodeManager().getNetworkTopology().getNumOfLeaves()
+          + " datanode(s) running and "
+          + (excludedNodes == null? "no": excludedNodes.size())
+          + " node(s) are excluded in this operation.");
     }
     return targets;
   }
@@ -1299,20 +1347,51 @@ public class BlockManager {
   }
 
   /**
-   * The given node is reporting all its blocks.  Use this info to
-   * update the (datanode-->blocklist) and (block-->nodelist) tables.
+   * The given datanode is reporting all its blocks.
+   * Update the (machine-->blocklist) and (block-->machinelist) maps.
    */
-  public void processReport(DatanodeDescriptor node, BlockListAsLongs report) 
-  throws IOException {
-    
-    boolean isFirstBlockReport = (node.numBlocks() == 0);
-    if (isFirstBlockReport) {
-      // Initial block reports can be processed a lot more efficiently than
-      // ordinary block reports.  This shortens NN restart times.
-      processFirstBlockReport(node, report);
-      return;
-    } 
+  public void processReport(final DatanodeID nodeID, final String poolId,
+      final BlockListAsLongs newReport) throws IOException {
+    namesystem.writeLock();
+    final long startTime = Util.now(); //after acquiring write lock
+    final long endTime;
+    try {
+      final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        throw new IOException("ProcessReport from dead or unregistered node: "
+                              + nodeID.getName());
+      }
 
+      // To minimize startup time, we discard any second (or later) block reports
+      // that we receive while still in startup phase.
+      if (namesystem.isInStartupSafeMode() && node.numBlocks() > 0) {
+        NameNode.stateChangeLog.info("BLOCK* processReport: "
+            + "discarded non-initial block report from " + nodeID.getName()
+            + " because namenode still in startup phase");
+        return;
+      }
+
+      if (node.numBlocks() == 0) {
+        // The first block report can be processed a lot more efficiently than
+        // ordinary block reports.  This shortens restart times.
+        processFirstBlockReport(node, newReport);
+      } else {
+        processReport(node, newReport);
+      }
+    } finally {
+      endTime = Util.now();
+      namesystem.writeUnlock();
+    }
+
+    // Log the block report processing stats from Namenode perspective
+    NameNode.getNameNodeMetrics().addBlockReport((int) (endTime - startTime));
+    NameNode.stateChangeLog.info("BLOCK* processReport: from "
+        + nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
+        + ", processing time: " + (endTime - startTime) + " msecs");
+  }
+
+  private void processReport(final DatanodeDescriptor node,
+      final BlockListAsLongs report) throws IOException {
     // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
@@ -1335,7 +1414,7 @@ public class BlockManager {
       addStoredBlock(b, node, null, true);
     }
     for (Block b : toInvalidate) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block "
+      NameNode.stateChangeLog.info("BLOCK* processReport: block "
           + b + " on " + node.getName() + " size " + b.getNumBytes()
           + " does not belong to any file.");
       addToInvalidates(b, node);
@@ -1356,8 +1435,8 @@ public class BlockManager {
    * @param report - the initial block report, to be processed
    * @throws IOException 
    */
-  void processFirstBlockReport(DatanodeDescriptor node, BlockListAsLongs report) 
-  throws IOException {
+  private void processFirstBlockReport(final DatanodeDescriptor node,
+      final BlockListAsLongs report) throws IOException {
     if (report == null) return;
     assert (namesystem.hasWriteLock());
     assert (node.numBlocks() == 0);
@@ -1454,12 +1533,12 @@ public class BlockManager {
    * @param toUC replicas of blocks currently under construction
    * @return
    */
-  BlockInfo processReportedBlock(DatanodeDescriptor dn, 
-      Block block, ReplicaState reportedState, 
-      Collection<BlockInfo> toAdd, 
-      Collection<Block> toInvalidate, 
-      Collection<BlockInfo> toCorrupt,
-      Collection<StatefulBlockInfo> toUC) {
+  private BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
+      final Block block, final ReplicaState reportedState, 
+      final Collection<BlockInfo> toAdd, 
+      final Collection<Block> toInvalidate, 
+      final Collection<BlockInfo> toCorrupt,
+      final Collection<StatefulBlockInfo> toUC) {
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("Reported block " + block
@@ -1629,11 +1708,9 @@ public class BlockManager {
     }
     if (storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
-                                   + "addStoredBlock request received for "
-                                   + block + " on " + node.getName()
-                                   + " size " + block.getNumBytes()
-                                   + " But it does not belong to any file.");
+      NameNode.stateChangeLog.info("BLOCK* addStoredBlock: " + block + " on "
+          + node.getName() + " size " + block.getNumBytes()
+          + " but it does not belong to any file.");
       // we could add this block to invalidate set of this datanode.
       // it will happen in next block report otherwise.
       return block;
@@ -1649,13 +1726,13 @@ public class BlockManager {
     if (added) {
       curReplicaDelta = 1;
       if (logEveryBlock) {
-        NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
+        NameNode.stateChangeLog.info("BLOCK* addStoredBlock: "
             + "blockMap updated: " + node.getName() + " is added to " + 
             storedBlock + " size " + storedBlock.getNumBytes());
       }
     } else {
       curReplicaDelta = 0;
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
+      NameNode.stateChangeLog.warn("BLOCK* addStoredBlock: "
           + "Redundant addStoredBlock request received for " + storedBlock
           + " on " + node.getName() + " size " + storedBlock.getNumBytes());
     }
@@ -1752,52 +1829,76 @@ public class BlockManager {
    * over or under replicated. Place it into the respective queue.
    */
   public void processMisReplicatedBlocks() {
+    assert namesystem.hasWriteLock();
+
     long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
-    namesystem.writeLock();
-    try {
-      neededReplications.clear();
-      for (BlockInfo block : blocksMap.getBlocks()) {
-        INodeFile fileINode = block.getINode();
-        if (fileINode == null) {
-          // block does not belong to any file
-          nrInvalid++;
-          addToInvalidates(block);
-          continue;
-        }
-        // calculate current replication
-        short expectedReplication = fileINode.getReplication();
-        NumberReplicas num = countNodes(block);
-        int numCurrentReplica = num.liveReplicas();
-        // add to under-replicated queue if need to be
-        if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
-          if (neededReplications.add(block, numCurrentReplica, num
-              .decommissionedReplicas(), expectedReplication)) {
-            nrUnderReplicated++;
-          }
+    neededReplications.clear();
+    for (BlockInfo block : blocksMap.getBlocks()) {
+      INodeFile fileINode = block.getINode();
+      if (fileINode == null) {
+        // block does not belong to any file
+        nrInvalid++;
+        addToInvalidates(block);
+        continue;
+      }
+      // calculate current replication
+      short expectedReplication = fileINode.getReplication();
+      NumberReplicas num = countNodes(block);
+      int numCurrentReplica = num.liveReplicas();
+      // add to under-replicated queue if need to be
+      if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
+        if (neededReplications.add(block, numCurrentReplica, num
+            .decommissionedReplicas(), expectedReplication)) {
+          nrUnderReplicated++;
         }
+      }
 
-        if (numCurrentReplica > expectedReplication) {
-          // over-replicated block
-          nrOverReplicated++;
-          processOverReplicatedBlock(block, expectedReplication, null, null);
-        }
+      if (numCurrentReplica > expectedReplication) {
+        // over-replicated block
+        nrOverReplicated++;
+        processOverReplicatedBlock(block, expectedReplication, null, null);
       }
-    } finally {
-      namesystem.writeUnlock();
     }
+
     LOG.info("Total number of blocks            = " + blocksMap.size());
     LOG.info("Number of invalid blocks          = " + nrInvalid);
     LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
     LOG.info("Number of  over-replicated blocks = " + nrOverReplicated);
   }
 
+  /** Set replication for the blocks. */
+  public void setReplication(final short oldRepl, final short newRepl,
+      final String src, final Block... blocks) throws IOException {
+    if (newRepl == oldRepl) {
+      return;
+    }
+
+    // update needReplication priority queues
+    for(Block b : blocks) {
+      updateNeededReplications(b, 0, newRepl-oldRepl);
+    }
+      
+    if (oldRepl > newRepl) {
+      // old replication > the new one; need to remove copies
+      LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
+          + " for " + src);
+      for(Block b : blocks) {
+        processOverReplicatedBlock(b, newRepl, null, null);
+      }
+    } else { // replication factor is increased
+      LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
+          + " for " + src);
+    }
+  }
+
   /**
    * Find how many of the containing nodes are "extra", if any.
    * If there are any extras, call chooseExcessReplicates() to
    * mark them in the excessReplicateMap.
    */
-  public void processOverReplicatedBlock(Block block, short replication,
-      DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
+  private void processOverReplicatedBlock(final Block block,
+      final short replication, final DatanodeDescriptor addedNode,
+      DatanodeDescriptor delNodeHint) {
     assert namesystem.hasWriteLock();
     if (addedNode == delNodeHint) {
       delNodeHint = null;
@@ -1819,12 +1920,112 @@ public class BlockManager {
         }
       }
     }
-    namesystem.chooseExcessReplicates(nonExcess, block, replication, 
+    chooseExcessReplicates(nonExcess, block, replication, 
         addedNode, delNodeHint, blockplacement);
   }
 
 
-  public void addToExcessReplicate(DatanodeInfo dn, Block block) {
+  /**
+   * We want "replication" replicates for the block, but we now have too many.  
+   * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
+   *
+   * srcNodes.size() - dstNodes.size() == replication
+   *
+   * We pick node that make sure that replicas are spread across racks and
+   * also try hard to pick one with least free space.
+   * The algorithm is first to pick a node with least free space from nodes
+   * that are on a rack holding more than one replicas of the block.
+   * So removing such a replica won't remove a rack. 
+   * If no such a node is available,
+   * then pick a node with least free space
+   */
+  private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
+                              Block b, short replication,
+                              DatanodeDescriptor addedNode,
+                              DatanodeDescriptor delNodeHint,
+                              BlockPlacementPolicy replicator) {
+    assert namesystem.hasWriteLock();
+    // first form a rack to datanodes map and
+    INodeFile inode = getINode(b);
+    final Map<String, List<DatanodeDescriptor>> rackMap
+        = new HashMap<String, List<DatanodeDescriptor>>();
+    for(final Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
+        iter.hasNext(); ) {
+      final DatanodeDescriptor node = iter.next();
+      final String rackName = node.getNetworkLocation();
+      List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
+      if (datanodeList == null) {
+        datanodeList = new ArrayList<DatanodeDescriptor>();
+        rackMap.put(rackName, datanodeList);
+      }
+      datanodeList.add(node);
+    }
+    
+    // split nodes into two sets
+    // priSet contains nodes on rack with more than one replica
+    // remains contains the remaining nodes
+    final List<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
+    for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
+      if (datanodeList.size() == 1 ) {
+        remains.add(datanodeList.get(0));
+      } else {
+        priSet.addAll(datanodeList);
+      }
+    }
+    
+    // pick one node to delete that favors the delete hint
+    // otherwise pick one with least space from priSet if it is not empty
+    // otherwise one node with least space from remains
+    boolean firstOne = true;
+    while (nonExcess.size() - replication > 0) {
+      // check if we can delete delNodeHint
+      final DatanodeInfo cur;
+      if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
+          && (priSet.contains(delNodeHint)
+              || (addedNode != null && !priSet.contains(addedNode))) ) {
+        cur = delNodeHint;
+      } else { // regular excessive replica removal
+        cur = replicator.chooseReplicaToDelete(inode, b, replication,
+            priSet, remains);
+      }
+      firstOne = false;
+
+      // adjust rackmap, priSet, and remains
+      String rack = cur.getNetworkLocation();
+      final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
+      datanodes.remove(cur);
+      if (datanodes.isEmpty()) {
+        rackMap.remove(rack);
+      }
+      if (priSet.remove(cur)) {
+        if (datanodes.size() == 1) {
+          priSet.remove(datanodes.get(0));
+          remains.add(datanodes.get(0));
+        }
+      } else {
+        remains.remove(cur);
+      }
+
+      nonExcess.remove(cur);
+      addToExcessReplicate(cur, b);
+
+      //
+      // The 'excessblocks' tracks blocks until we get confirmation
+      // that the datanode has deleted them; the only way we remove them
+      // is when we get a "removeBlock" message.  
+      //
+      // The 'invalidate' list is used to inform the datanode the block 
+      // should be deleted.  Items are removed from the invalidate list
+      // upon giving instructions to the namenode.
+      //
+      addToInvalidates(b, cur);
+      NameNode.stateChangeLog.info("BLOCK* chooseExcessReplicates: "
+                +"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
+    }
+  }
+
+  private void addToExcessReplicate(DatanodeInfo dn, Block block) {
     assert namesystem.hasWriteLock();
     Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
     if (excessBlocks == null) {
@@ -1834,7 +2035,7 @@ public class BlockManager {
     if (excessBlocks.add(block)) {
       excessBlocksCount++;
       if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates:"
+        NameNode.stateChangeLog.debug("BLOCK* addToExcessReplicate:"
             + " (" + dn.getName() + ", " + block
             + ") is added to excessReplicateMap");
       }
@@ -1847,14 +2048,14 @@ public class BlockManager {
    */
   private void removeStoredBlock(Block block, DatanodeDescriptor node) {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+      NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
           + block + " from " + node.getName());
     }
     assert (namesystem.hasWriteLock());
     {
       if (!blocksMap.removeNode(block, node)) {
         if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+          NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
               + block + " has already been removed from node " + node);
         }
         return;
@@ -1882,8 +2083,7 @@ public class BlockManager {
         if (excessBlocks.remove(block)) {
           excessBlocksCount--;
           if(NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug(
-                "BLOCK* NameSystem.removeStoredBlock: "
+            NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
                 + block + " is removed from excessBlocks");
           }
           if (excessBlocks.size() == 0) {
@@ -1915,7 +2115,7 @@ public class BlockManager {
   /**
    * The given node is reporting that it received a certain block.
    */
-  public void addBlock(DatanodeDescriptor node, Block block, String delHint)
+  private void addBlock(DatanodeDescriptor node, Block block, String delHint)
       throws IOException {
     // decrement number of blocks scheduled to this datanode.
     node.decBlocksScheduled();
@@ -1925,9 +2125,8 @@ public class BlockManager {
     if (delHint != null && delHint.length() != 0) {
       delHintNode = datanodeManager.getDatanode(delHint);
       if (delHintNode == null) {
-        NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
-            + block + " is expected to be removed from an unrecorded node "
-            + delHint);
+        NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + block
+            + " is expected to be removed from an unrecorded node " + delHint);
       }
     }
 
@@ -1955,7 +2154,7 @@ public class BlockManager {
       addStoredBlock(b, node, delHintNode, true);
     }
     for (Block b : toInvalidate) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
+      NameNode.stateChangeLog.info("BLOCK* addBlock: block "
           + b + " on " + node.getName() + " size " + b.getNumBytes()
           + " does not belong to any file.");
       addToInvalidates(b, node);
@@ -1965,6 +2164,30 @@ public class BlockManager {
     }
   }
 
+  /** The given node is reporting that it received a certain block. */
+  public void blockReceived(final DatanodeID nodeID, final String poolId,
+      final Block block, final String delHint) throws IOException {
+    namesystem.writeLock();
+    try {
+      final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        final String s = block + " is received from dead or unregistered node "
+            + nodeID.getName();
+        NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + s);
+        throw new IOException(s);
+      } 
+
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("BLOCK* blockReceived: " + block
+            + " is received from " + nodeID.getName());
+      }
+
+      addBlock(node, block, delHint);
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
   /**
    * Return the number of nodes that are live and decommissioned.
    */
@@ -2047,7 +2270,7 @@ public class BlockManager {
    * On stopping decommission, check if the node has excess replicas.
    * If there are any excess replicas, call processOverReplicatedBlock()
    */
-  private void processOverReplicatedBlocksOnReCommission(
+  void processOverReplicatedBlocksOnReCommission(
       final DatanodeDescriptor srcNode) {
     final Iterator<? extends Block> it = srcNode.getBlockIterator();
     while(it.hasNext()) {
@@ -2145,9 +2368,19 @@ public class BlockManager {
     return blocksMap.getStoredBlock(block);
   }
 
-  /* updates a block in under replication queue */
-  public void updateNeededReplications(Block block, int curReplicasDelta,
-      int expectedReplicasDelta) {
+
+  /** Should the access keys be updated? */
+  boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
+    final boolean b = isBlockTokenEnabled && blockKeyUpdateInterval < updateTime;
+    if (b) {
+      blockTokenSecretManager.updateKeys();
+    }
+    return b;
+  }
+
+  /** updates a block in under replication queue */
+  private void updateNeededReplications(final Block block,
+      final int curReplicasDelta, int expectedReplicasDelta) {
     namesystem.writeLock();
     try {
       NumberReplicas repl = countNodes(block);
@@ -2306,8 +2539,9 @@ public class BlockManager {
     return blocksMap.getINode(b);
   }
 
-  public void removeFromCorruptReplicasMap(Block block) {
-    corruptReplicas.removeFromCorruptReplicasMap(block);
+  /** @return an iterator of the datanodes. */
+  public Iterator<DatanodeDescriptor> datanodeIterator(final Block block) {
+    return blocksMap.nodeIterator(block);
   }
 
   public int numCorruptReplicas(Block block) {
@@ -2316,6 +2550,8 @@ public class BlockManager {
 
   public void removeBlockFromMap(Block block) {
     blocksMap.removeBlock(block);
+    // If block is removed from blocksMap remove it from corruptReplicasMap
+    corruptReplicas.removeFromCorruptReplicasMap(block);
   }
 
   public int getCapacity() {
@@ -2350,63 +2586,22 @@ public class BlockManager {
   /**
    * Return an iterator over the set of blocks for which there are no replicas.
    */
-  public BlockIterator getCorruptReplicaBlockIterator() {
-    return neededReplications
-        .iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
-  }
-
-  /**
-   * Change, if appropriate, the admin state of a datanode to 
-   * decommission completed. Return true if decommission is complete.
-   */
-  boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
-    // Check to see if all blocks in this decommissioned
-    // node has reached their target replication factor.
-    if (node.isDecommissionInProgress()) {
-      if (!isReplicationInProgress(node)) {
-        node.setDecommissioned();
-        LOG.info("Decommission complete for node " + node.getName());
-      }
-    }
-    return node.isDecommissioned();
+  public Iterator<Block> getCorruptReplicaBlockIterator() {
+    return neededReplications.iterator(
+        UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
   }
 
-  /** Start decommissioning the specified datanode. */
-  void startDecommission(DatanodeDescriptor node) throws IOException {
-    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-      LOG.info("Start Decommissioning node " + node.getName() + " with " + 
-          node.numBlocks() +  " blocks.");
-      synchronized (namesystem.heartbeats) {
-        namesystem.updateStats(node, false);
-        node.startDecommission();
-        namesystem.updateStats(node, true);
-      }
-      node.decommissioningStatus.setStartTime(now());
-      
-      // all the blocks that reside on this node have to be replicated.
-      checkDecommissionStateInternal(node);
-    }
-  }
-
-  /** Stop decommissioning the specified datanodes. */
-  void stopDecommission(DatanodeDescriptor node) throws IOException {
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      LOG.info("Stop Decommissioning node " + node.getName());
-      synchronized (namesystem.heartbeats) {
-        namesystem.updateStats(node, false);
-        node.stopDecommission();
-        namesystem.updateStats(node, true);
-      }
-      processOverReplicatedBlocksOnReCommission(node);
-    }
+  /** @return the size of UnderReplicatedBlocks */
+  public int numOfUnderReplicatedBlocks() {
+    return neededReplications.size();
   }
 
   /**
    * Periodically calls computeReplicationWork().
    */
   private class ReplicationMonitor implements Runnable {
-    static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
-    static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
+    private static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
+    private static final int REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
 
     @Override
     public void run() {
@@ -2439,8 +2634,6 @@ public class BlockManager {
    */
   int computeDatanodeWork() throws IOException {
     int workFound = 0;
-    int blocksToProcess = 0;
-    int nodesToProcess = 0;
     // Blocks should not be replicated or removed if in safe mode.
     // It's OK to check safe mode here w/o holding lock, in the worst
     // case extra replications will be scheduled, and these will get
@@ -2448,11 +2641,11 @@ public class BlockManager {
     if (namesystem.isInSafeMode())
       return workFound;
 
-    synchronized (namesystem.heartbeats) {
-      blocksToProcess = (int) (namesystem.heartbeats.size() * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
-      nodesToProcess = (int) Math.ceil((double) namesystem.heartbeats.size()
-          * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
-    }
+    final int numlive = heartbeatManager.getLiveDatanodeCount();
+    final int blocksToProcess = numlive
+        * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION;
+    final int nodesToProcess = (int) Math.ceil(numlive
+        * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100.0);
 
     workFound = this.computeReplicationWork(blocksToProcess);
 

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Tue Aug 16 00:37:15 2011
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Tue Aug 16 00:37:15 2011
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdfs.util.Light
  * block's metadata currently includes INode it belongs to and
  * the datanodes that store the block.
  */
-public class BlocksMap {
+class BlocksMap {
   private static class NodeIterator implements Iterator<DatanodeDescriptor> {
     private BlockInfo blockInfo;
     private int nextIdx = 0;
@@ -101,7 +101,7 @@ public class BlocksMap {
   /**
    * Add block b belonging to the specified file inode to the map.
    */
-  public BlockInfo addINode(BlockInfo b, INodeFile iNode) {
+  BlockInfo addINode(BlockInfo b, INodeFile iNode) {
     BlockInfo info = blocks.get(b);
     if (info != b) {
       info = b;
@@ -137,7 +137,7 @@ public class BlocksMap {
    * Searches for the block in the BlocksMap and 
    * returns Iterator that iterates through the nodes the block belongs to.
    */
-  public Iterator<DatanodeDescriptor> nodeIterator(Block b) {
+  Iterator<DatanodeDescriptor> nodeIterator(Block b) {
     return nodeIterator(blocks.get(b));
   }
 
@@ -182,27 +182,6 @@ public class BlocksMap {
   Iterable<BlockInfo> getBlocks() {
     return blocks;
   }
-
-  /**
-   * Check if the block exists in map
-   */
-  public boolean contains(Block block) {
-    return blocks.contains(block);
-  }
-  
-  /**
-   * Check if the replica at the given datanode exists in map
-   */
-  boolean contains(Block block, DatanodeDescriptor datanode) {
-    BlockInfo info = blocks.get(block);
-    if (info == null)
-      return false;
-    
-    if (-1 == info.findDatanode(datanode))
-      return false;
-    
-    return true;
-  }
   
   /** Get the capacity of the HashMap that stores blocks */
   int getCapacity() {

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Tue Aug 16 00:37:15 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
@@ -32,6 +34,7 @@ import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -49,6 +52,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -56,13 +60,12 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
-import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
+import org.apache.hadoop.hdfs.util.CyclicIteration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.ScriptBasedMapping;
-import org.apache.hadoop.util.CyclicIteration;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -75,7 +78,10 @@ import org.apache.hadoop.util.Reflection
 public class DatanodeManager {
   static final Log LOG = LogFactory.getLog(DatanodeManager.class);
 
-  final FSNamesystem namesystem;
+  private final FSNamesystem namesystem;
+  private final BlockManager blockManager;
+
+  private final HeartbeatManager heartbeatManager;
 
   /**
    * Stores the datanode -> block map.  
@@ -117,9 +123,14 @@ public class DatanodeManager {
   /** Ask Datanode only up to this many blocks to delete. */
   final int blockInvalidateLimit;
   
-  DatanodeManager(final FSNamesystem namesystem, final Configuration conf
+  DatanodeManager(final BlockManager blockManager,
+      final FSNamesystem namesystem, final Configuration conf
       ) throws IOException {
     this.namesystem = namesystem;
+    this.blockManager = blockManager;
+
+    this.heartbeatManager = new HeartbeatManager(namesystem, conf);
+
     this.hostsReader = new HostsFileReader(
         conf.get(DFSConfigKeys.DFS_HOSTS, ""),
         conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
@@ -158,17 +169,30 @@ public class DatanodeManager {
         conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY, 
                     DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
     decommissionthread.start();
+
+    heartbeatManager.activate(conf);
   }
 
   void close() {
     if (decommissionthread != null) decommissionthread.interrupt();
+    heartbeatManager.close();
   }
 
   /** @return the network topology. */
   public NetworkTopology getNetworkTopology() {
     return networktopology;
   }
-  
+
+  /** @return the heartbeat manager. */
+  HeartbeatManager getHeartbeatManager() {
+    return heartbeatManager;
+  }
+
+  /** @return the datanode statistics. */
+  public DatanodeStatistics getDatanodeStatistics() {
+    return heartbeatManager;
+  }
+
   /** Sort the located blocks by the distance to the target host. */
   public void sortLocatedBlocks(final String targethost,
       final List<LocatedBlock> locatedblocks) {
@@ -231,9 +255,44 @@ public class DatanodeManager {
     }
   }
 
+  /**
+   * Remove a datanode descriptor.
+   * @param nodeInfo datanode descriptor.
+   */
+  private void removeDatanode(DatanodeDescriptor nodeInfo) {
+    assert namesystem.hasWriteLock();
+    heartbeatManager.removeDatanode(nodeInfo);
+    blockManager.removeBlocksAssociatedTo(nodeInfo);
+    networktopology.remove(nodeInfo);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("remove datanode " + nodeInfo.getName());
+    }
+    namesystem.checkSafeMode();
+  }
+
+  /**
+   * Remove a datanode
+   * @throws UnregisteredNodeException 
+   */
+  public void removeDatanode(final DatanodeID node
+      ) throws UnregisteredNodeException {
+    namesystem.writeLock();
+    try {
+      final DatanodeDescriptor descriptor = getDatanode(node);
+      if (descriptor != null) {
+        removeDatanode(descriptor);
+      } else {
+        NameNode.stateChangeLog.warn("BLOCK* removeDatanode: "
+                                     + node.getName() + " does not exist");
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
   /** Remove a dead datanode. */
-  public void removeDeadDatanode(final DatanodeID nodeID) {
-    synchronized(namesystem.heartbeats) {
+  void removeDeadDatanode(final DatanodeID nodeID) {
       synchronized(datanodeMap) {
         DatanodeDescriptor d;
         try {
@@ -244,14 +303,13 @@ public class DatanodeManager {
         if (d != null && isDatanodeDead(d)) {
           NameNode.stateChangeLog.info(
               "BLOCK* removeDeadDatanode: lost heartbeat from " + d.getName());
-          namesystem.removeDatanode(d);
+          removeDatanode(d);
         }
       }
-    }
   }
 
   /** Is the datanode dead? */
-  public boolean isDatanodeDead(DatanodeDescriptor node) {
+  boolean isDatanodeDead(DatanodeDescriptor node) {
     return (node.getLastUpdate() <
             (Util.now() - heartbeatExpireInterval));
   }
@@ -348,7 +406,7 @@ public class DatanodeManager {
    * @param nodeList
    *          , array list of live or dead nodes.
    */
-  public void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
+  private void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
     // If the include list is empty, any nodes are welcomed and it does not
     // make sense to exclude any nodes from the cluster. Therefore, no remove.
     if (hostsReader.getHosts().isEmpty()) {
@@ -423,11 +481,48 @@ public class DatanodeManager {
     throws IOException {
     // If the registered node is in exclude list, then decommission it
     if (inExcludedHostsList(nodeReg, ipAddr)) {
-      namesystem.getBlockManager().startDecommission(nodeReg);
+      startDecommission(nodeReg);
+    }
+  }
+
+  /**
+   * Change, if appropriate, the admin state of a datanode to 
+   * decommission completed. Return true if decommission is complete.
+   */
+  boolean checkDecommissionState(DatanodeDescriptor node) {
+    // Check to see if all blocks in this decommissioned
+    // node has reached their target replication factor.
+    if (node.isDecommissionInProgress()) {
+      if (!blockManager.isReplicationInProgress(node)) {
+        node.setDecommissioned();
+        LOG.info("Decommission complete for node " + node.getName());
+      }
+    }
+    return node.isDecommissioned();
+  }
+
+  /** Start decommissioning the specified datanode. */
+  private void startDecommission(DatanodeDescriptor node) throws IOException {
+    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+      LOG.info("Start Decommissioning node " + node.getName() + " with " + 
+          node.numBlocks() +  " blocks.");
+      heartbeatManager.startDecommission(node);
+      node.decommissioningStatus.setStartTime(now());
+      
+      // all the blocks that reside on this node have to be replicated.
+      checkDecommissionState(node);
+    }
+  }
+
+  /** Stop decommissioning the specified datanodes. */
+  void stopDecommission(DatanodeDescriptor node) throws IOException {
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      LOG.info("Stop Decommissioning node " + node.getName());
+      heartbeatManager.stopDecommission(node);
+      blockManager.processOverReplicatedBlocksOnReCommission(node);
     }
   }
 
-  
   /**
    * Generate new storage ID.
    * 
@@ -469,7 +564,7 @@ public class DatanodeManager {
                                       nodeReg.getInfoPort(),
                                       nodeReg.getIpcPort());
     nodeReg.updateRegInfo(dnReg);
-    nodeReg.exportedKeys = namesystem.getBlockManager().getBlockKeys();
+    nodeReg.exportedKeys = blockManager.getBlockKeys();
       
     NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
         + "node registration from " + nodeReg.getName()
@@ -483,7 +578,7 @@ public class DatanodeManager {
                         + "node from name: " + nodeN.getName());
       // nodeN previously served a different data storage, 
       // which is not served by anybody anymore.
-      namesystem.removeDatanode(nodeN);
+      removeDatanode(nodeN);
       // physically remove node from datanodeMap
       wipeDatanode(nodeN);
       nodeN = null;
@@ -525,14 +620,7 @@ public class DatanodeManager {
       getNetworkTopology().add(nodeS);
         
       // also treat the registration message as a heartbeat
-      synchronized(namesystem.heartbeats) {
-        if( !namesystem.heartbeats.contains(nodeS)) {
-          namesystem.heartbeats.add(nodeS);
-          //update its timestamp
-          nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
-          nodeS.isAlive = true;
-        }
-      }
+      heartbeatManager.register(nodeS);
       checkDecommissioning(nodeS, dnAddress);
       return;
     } 
@@ -556,16 +644,29 @@ public class DatanodeManager {
     checkDecommissioning(nodeDescr, dnAddress);
     
     // also treat the registration message as a heartbeat
-    synchronized(namesystem.heartbeats) {
-      namesystem.heartbeats.add(nodeDescr);
-      nodeDescr.isAlive = true;
-      // no need to update its timestamp
-      // because its is done when the descriptor is created
+    // no need to update its timestamp
+    // because its is done when the descriptor is created
+    heartbeatManager.addDatanode(nodeDescr);
+  }
+
+  /**
+   * Rereads conf to get hosts and exclude list file names.
+   * Rereads the files to update the hosts and exclude lists.  It
+   * checks if any of the hosts have changed states:
+   */
+  public void refreshNodes(final Configuration conf) throws IOException {
+    namesystem.checkSuperuserPrivilege();
+    refreshHostsReader(conf);
+    namesystem.writeLock();
+    try {
+      refreshDatanodes();
+    } finally {
+      namesystem.writeUnlock();
     }
   }
 
   /** Reread include/exclude files. */
-  public void refreshHostsReader(Configuration conf) throws IOException {
+  private void refreshHostsReader(Configuration conf) throws IOException {
     // Reread the conf to get dfs.hosts and dfs.hosts.exclude filenames.
     // Update the file names and refresh internal includes and excludes list.
     if (conf == null) {
@@ -577,24 +678,21 @@ public class DatanodeManager {
   }
   
   /**
-   * Rereads the config to get hosts and exclude list file names.
-   * Rereads the files to update the hosts and exclude lists.  It
-   * checks if any of the hosts have changed states:
    * 1. Added to hosts  --> no further work needed here.
    * 2. Removed from hosts --> mark AdminState as decommissioned. 
    * 3. Added to exclude --> start decommission.
    * 4. Removed from exclude --> stop decommission.
    */
-  public void refreshDatanodes() throws IOException {
+  private void refreshDatanodes() throws IOException {
     for(DatanodeDescriptor node : datanodeMap.values()) {
       // Check if not include.
       if (!inHostsList(node, null)) {
-        node.setDisallowed(true);  // case 2.
+        node.setDisallowed(true); // case 2.
       } else {
         if (inExcludedHostsList(node, null)) {
-          namesystem.getBlockManager().startDecommission(node);   // case 3.
+          startDecommission(node); // case 3.
         } else {
-          namesystem.getBlockManager().stopDecommission(node);   // case 4.
+          stopDecommission(node); // case 4.
         }
       }
     }
@@ -626,16 +724,59 @@ public class DatanodeManager {
     return numDead;
   }
 
+  /** @return list of datanodes where decommissioning is in progress. */
+  public List<DatanodeDescriptor> getDecommissioningNodes() {
+    namesystem.readLock();
+    try {
+      final List<DatanodeDescriptor> decommissioningNodes
+          = new ArrayList<DatanodeDescriptor>();
+      final List<DatanodeDescriptor> results = getDatanodeListForReport(
+          DatanodeReportType.LIVE);
+      for(DatanodeDescriptor node : results) {
+        if (node.isDecommissionInProgress()) {
+          decommissioningNodes.add(node);
+        }
+      }
+      return decommissioningNodes;
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+
+
   /** Fetch live and dead datanodes. */
-  public void fetchDatanodess(final List<DatanodeDescriptor> live, 
-      final List<DatanodeDescriptor> dead) {
-    final List<DatanodeDescriptor> results =
-        getDatanodeListForReport(DatanodeReportType.ALL);    
-    for(DatanodeDescriptor node : results) {
-      if (isDatanodeDead(node))
-        dead.add(node);
-      else
-        live.add(node);
+  public void fetchDatanodes(final List<DatanodeDescriptor> live, 
+      final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {
+    if (live == null && dead == null) {
+      throw new HadoopIllegalArgumentException("Both live and dead lists are null");
+    }
+
+    namesystem.readLock();
+    try {
+      final List<DatanodeDescriptor> results =
+          getDatanodeListForReport(DatanodeReportType.ALL);    
+      for(DatanodeDescriptor node : results) {
+        if (isDatanodeDead(node)) {
+          if (dead != null) {
+            dead.add(node);
+          }
+        } else {
+          if (live != null) {
+            live.add(node);
+          }
+        }
+      }
+    } finally {
+      namesystem.readUnlock();
+    }
+    
+    if (removeDecommissionNode) {
+      if (live != null) {
+        removeDecomNodeFromList(live);
+      }
+      if (dead != null) {
+        removeDecomNodeFromList(dead);
+      }
     }
   }
 
@@ -712,7 +853,7 @@ public class DatanodeManager {
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       int xceiverCount, int maxTransfers, int failedVolumes
       ) throws IOException {
-    synchronized (namesystem.heartbeats) {
+    synchronized (heartbeatManager) {
       synchronized (datanodeMap) {
         DatanodeDescriptor nodeinfo = null;
         try {
@@ -731,10 +872,8 @@ public class DatanodeManager {
           return new DatanodeCommand[]{DatanodeCommand.REGISTER};
         }
 
-        namesystem.updateStats(nodeinfo, false);
-        nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
-            xceiverCount, failedVolumes);
-        namesystem.updateStats(nodeinfo, true);
+        heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
+            remaining, blockPoolUsed, xceiverCount, failedVolumes);
         
         //check lease recovery
         BlockInfoUnderConstruction[] blocks = nodeinfo
@@ -765,7 +904,7 @@ public class DatanodeManager {
               blockPoolId, blks));
         }
         
-        namesystem.addKeyUpdateCommand(cmds, nodeinfo);
+        blockManager.addKeyUpdateCommand(cmds, nodeinfo);
 
         // check for balancer bandwidth update
         if (nodeinfo.getBalancerBandwidth() > 0) {

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java Tue Aug 16 00:37:15 2011
@@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.util.CyclicIteration;
 
 /**
  * Manage node decommissioning.
@@ -35,11 +34,9 @@ class DecommissionManager {
   static final Log LOG = LogFactory.getLog(DecommissionManager.class);
 
   private final FSNamesystem fsnamesystem;
-  private final BlockManager blockManager;
 
-  DecommissionManager(FSNamesystem namesystem) {
+  DecommissionManager(final FSNamesystem namesystem) {
     this.fsnamesystem = namesystem;
-    this.blockManager = fsnamesystem.getBlockManager();
   }
 
   /** Periodically check decommission status. */
@@ -81,16 +78,16 @@ class DecommissionManager {
     }
     
     private void check() {
+      final DatanodeManager dm = fsnamesystem.getBlockManager().getDatanodeManager();
       int count = 0;
       for(Map.Entry<String, DatanodeDescriptor> entry
-          : blockManager.getDatanodeManager().getDatanodeCyclicIteration(
-              firstkey)) {
+          : dm.getDatanodeCyclicIteration(firstkey)) {
         final DatanodeDescriptor d = entry.getValue();
         firstkey = entry.getKey();
 
         if (d.isDecommissionInProgress()) {
           try {
-            blockManager.checkDecommissionStateInternal(d);
+            dm.checkDecommissionState(d);
           } catch(Exception e) {
             LOG.warn("entry=" + entry, e);
           }

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java Tue Aug 16 00:37:15 2011
@@ -17,21 +17,26 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
-/* Class for keeping track of under replication blocks
+/** Keep track of under replication blocks.
  * Blocks have replication priority, with priority 0 indicating the highest
  * Blocks have only one replicas has the highest
  */
-public class UnderReplicatedBlocks implements Iterable<Block> {
+class UnderReplicatedBlocks implements Iterable<Block> {
   static final int LEVEL = 5;
-  static public final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
-  private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
+  static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
+  private final List<NavigableSet<Block>> priorityQueues
+      = new ArrayList<NavigableSet<Block>>();
       
-  /* constructor */
+  /** Create an object. */
   UnderReplicatedBlocks() {
     for(int i=0; i<LEVEL; i++) {
       priorityQueues.add(new TreeSet<Block>());
@@ -47,8 +52,8 @@ public class UnderReplicatedBlocks imple
     }
   }
 
-  /* Return the total number of under replication blocks */
-  public synchronized int size() {
+  /** Return the total number of under replication blocks */
+  synchronized int size() {
     int size = 0;
     for (int i=0; i<LEVEL; i++) {
       size += priorityQueues.get(i).size();
@@ -56,7 +61,7 @@ public class UnderReplicatedBlocks imple
     return size;
   }
 
-  /* Return the number of under replication blocks excluding corrupt blocks */
+  /** Return the number of under replication blocks excluding corrupt blocks */
   synchronized int getUnderReplicatedBlockCount() {
     int size = 0;
     for (int i=0; i<QUEUE_WITH_CORRUPT_BLOCKS; i++) {
@@ -70,15 +75,15 @@ public class UnderReplicatedBlocks imple
     return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
   }
   
-  /* Check if a block is in the neededReplication queue */
-  public synchronized boolean contains(Block block) {
-    for(TreeSet<Block> set:priorityQueues) {
+  /** Check if a block is in the neededReplication queue */
+  synchronized boolean contains(Block block) {
+    for(NavigableSet<Block> set : priorityQueues) {
       if(set.contains(block)) { return true; }
     }
     return false;
   }
       
-  /* Return the priority of a block
+  /** Return the priority of a block
    * @param block a under replication block
    * @param curReplicas current number of replicas of the block
    * @param expectedReplicas expected number of replicas of the block
@@ -106,7 +111,7 @@ public class UnderReplicatedBlocks imple
     }
   }
       
-  /* add a block to a under replication queue according to its priority
+  /** add a block to a under replication queue according to its priority
    * @param block a under replication block
    * @param curReplicas current number of replicas of the block
    * @param expectedReplicas expected number of replicas of the block
@@ -134,7 +139,7 @@ public class UnderReplicatedBlocks imple
     return false;
   }
 
-  /* remove a block from a under replication queue */
+  /** remove a block from a under replication queue */
   synchronized boolean remove(Block block, 
                               int oldReplicas, 
                               int decommissionedReplicas,
@@ -145,7 +150,7 @@ public class UnderReplicatedBlocks imple
     return remove(block, priLevel);
   }
       
-  /* remove a block from a under replication queue given a priority*/
+  /** remove a block from a under replication queue given a priority*/
   boolean remove(Block block, int priLevel) {
     if(priLevel >= 0 && priLevel < LEVEL 
         && priorityQueues.get(priLevel).remove(block)) {
@@ -174,7 +179,7 @@ public class UnderReplicatedBlocks imple
     return false;
   }
       
-  /* update the priority level of a block */
+  /** update the priority level of a block */
   synchronized void update(Block block, int curReplicas, 
                            int decommissionedReplicas,
                            int curExpectedReplicas,
@@ -209,30 +214,29 @@ public class UnderReplicatedBlocks imple
     }
   }
 
-  /* returns an iterator of all blocks in a given priority queue */
+  /** returns an iterator of all blocks in a given priority queue */
   synchronized BlockIterator iterator(int level) {
     return new BlockIterator(level);
   }
     
-  /* return an iterator of all the under replication blocks */
+  /** return an iterator of all the under replication blocks */
   public synchronized BlockIterator iterator() {
     return new BlockIterator();
   }
   
-  public class BlockIterator implements Iterator<Block> {
+  class BlockIterator implements Iterator<Block> {
     private int level;
     private boolean isIteratorForLevel = false;
     private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
 
-    BlockIterator()  
-    {
+    private BlockIterator() {
       level=0;
       for(int i=0; i<LEVEL; i++) {
         iterators.add(priorityQueues.get(i).iterator());
       }
     }
 
-    BlockIterator(int l) {
+    private BlockIterator(int l) {
       level = l;
       isIteratorForLevel = true;
       iterators.add(priorityQueues.get(level).iterator());
@@ -246,6 +250,7 @@ public class UnderReplicatedBlocks imple
       }
     }
 
+    @Override
     public Block next() {
       if (isIteratorForLevel)
         return iterators.get(0).next();
@@ -253,6 +258,7 @@ public class UnderReplicatedBlocks imple
       return iterators.get(level).next();
     }
 
+    @Override
     public boolean hasNext() {
       if (isIteratorForLevel)
         return iterators.get(0).hasNext();
@@ -260,6 +266,7 @@ public class UnderReplicatedBlocks imple
       return iterators.get(level).hasNext();
     }
 
+    @Override
     public void remove() {
       if (isIteratorForLevel) 
         iterators.get(0).remove();
@@ -267,8 +274,8 @@ public class UnderReplicatedBlocks imple
         iterators.get(level).remove();
     }
 
-    public int getPriority() {
+    int getPriority() {
       return level;
-    };
+    }
   }  
 }

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Tue Aug 16 00:37:15 2011
@@ -29,7 +29,10 @@ import org.apache.hadoop.classification.
  ************************************/
 
 @InterfaceAudience.Private
-public interface HdfsConstants {
+public final class HdfsConstants {
+  /* Hidden constructor */
+  private HdfsConstants() { }
+  
   /**
    * Type of the node
    */

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Tue Aug 16 00:37:15 2011
@@ -26,11 +26,11 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.URL;
 import java.net.URLEncoder;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.TreeSet;
 
 import javax.servlet.ServletContext;
@@ -190,13 +190,15 @@ public class JspHelper {
     s.connect(addr, HdfsConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
       
-      long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);     
+    long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
       
       // Use the block name for file name. 
-      String file = BlockReader.getFileName(addr, poolId, blockId);
-      BlockReader blockReader = BlockReader.newBlockReader(s, file,
+    int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
+        DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
+    String file = BlockReader.getFileName(addr, poolId, blockId);
+    BlockReader blockReader = BlockReader.newBlockReader(s, file,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
+        offsetIntoBlock, amtToRead, bufferSize);
         
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;
@@ -249,7 +251,7 @@ public class JspHelper {
     out.print("</tbody></table>");
   }
 
-  public static void sortNodeList(ArrayList<DatanodeDescriptor> nodes,
+  public static void sortNodeList(final List<DatanodeDescriptor> nodes,
                            String field, String order) {
         
     class NodeComapare implements Comparator<DatanodeDescriptor> {

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Aug 16 00:37:15 2011
@@ -54,7 +54,7 @@ import org.apache.hadoop.util.PureJavaCr
  * may copies it to another site. If a throttler is provided,
  * streaming throttling is also supported.
  **/
-class BlockReceiver implements Closeable, FSConstants {
+class BlockReceiver implements Closeable {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
@@ -179,8 +179,7 @@ class BlockReceiver implements Closeable
         this.out = streams.dataOut;
         this.cout = streams.checksumOut;
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
-                                                  streams.checksumOut,
-                                                  SMALL_BUFFER_SIZE));
+            streams.checksumOut, FSConstants.SMALL_BUFFER_SIZE));
         // write data chunk header if creating a new replica
         if (isCreate) {
           BlockMetadataHeader.writeHeader(checksumOut, checksum);
@@ -399,7 +398,7 @@ class BlockReceiver implements Closeable
       buf.limit(bufRead);
     }
     
-    while (buf.remaining() < SIZE_OF_INTEGER) {
+    while (buf.remaining() < FSConstants.BYTES_IN_INTEGER) {
       if (buf.position() > 0) {
         shiftBufData();
       }
@@ -418,9 +417,10 @@ class BlockReceiver implements Closeable
                             payloadLen);
     }
     
-    // Subtract SIZE_OF_INTEGER since that accounts for the payloadLen that
+    // Subtract BYTES_IN_INTEGER since that accounts for the payloadLen that
     // we read above.
-    int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN - SIZE_OF_INTEGER;
+    int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN
+        - FSConstants.BYTES_IN_INTEGER;
     
     if (buf.remaining() < pktSize) {
       //we need to read more data
@@ -817,7 +817,7 @@ class BlockReceiver implements Closeable
    * Processed responses from downstream datanodes in the pipeline
    * and sends back replies to the originator.
    */
-  class PacketResponder implements Runnable, Closeable, FSConstants {   
+  class PacketResponder implements Runnable, Closeable {   
 
     /** queue for packets waiting for ack */
     private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Aug 16 00:37:15 2011
@@ -42,7 +42,7 @@ import org.apache.hadoop.util.DataChecks
 /**
  * Reads a block from the disk and sends it to a recipient.
  */
-class BlockSender implements java.io.Closeable, FSConstants {
+class BlockSender implements java.io.Closeable {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
@@ -155,7 +155,7 @@ class BlockSender implements java.io.Clo
 
       if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
         checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
-            .getMetaDataInputStream(block), BUFFER_SIZE));
+            .getMetaDataInputStream(block), FSConstants.IO_FILE_BUFFER_SIZE));
 
         // read and handle the common header here. For now just a version
        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
@@ -472,15 +472,15 @@ class BlockSender implements java.io.Clo
         streamForSendChunks = baseStream;
         
         // assure a mininum buffer size.
-        maxChunksPerPacket = (Math.max(BUFFER_SIZE, 
+        maxChunksPerPacket = (Math.max(FSConstants.IO_FILE_BUFFER_SIZE, 
                                        MIN_BUFFER_WITH_TRANSFERTO)
                               + bytesPerChecksum - 1)/bytesPerChecksum;
         
         // allocate smaller buffer while using transferTo(). 
         pktSize += checksumSize * maxChunksPerPacket;
       } else {
-        maxChunksPerPacket = Math.max(1,
-                 (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
+        maxChunksPerPacket = Math.max(1, (FSConstants.IO_FILE_BUFFER_SIZE
+            + bytesPerChecksum - 1) / bytesPerChecksum);
         pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
       }
 



Mime
View raw message