hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1154899 [1/2] - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ ...
Date Mon, 08 Aug 2011 10:06:47 GMT
Author: szetszwo
Date: Mon Aug  8 10:06:45 2011
New Revision: 1154899

URL: http://svn.apache.org/viewvc?rev=1154899&view=rev
Log:
HDFS-2228. Move block and datanode code from FSNamesystem to BlockManager and DatanodeManager.  (szetszwo)

Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
    hadoop/common/trunk/hdfs/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Mon Aug  8 10:06:45 2011
@@ -641,6 +641,9 @@ Trunk (unreleased changes)
 
     HDFS-2226. Clean up counting of operations in FSEditLogLoader (todd)
 
+    HDFS-2228. Move block and datanode code from FSNamesystem to
+    BlockManager and DatanodeManager.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Mon Aug  8 10:06:45 2011
@@ -259,26 +259,6 @@ public class BlockInfo extends Block imp
     return head;
   }
 
-  boolean listIsConsistent(DatanodeDescriptor dn) {
-    // going forward
-    int count = 0;
-    BlockInfo next, nextPrev;
-    BlockInfo cur = this;
-    while(cur != null) {
-      next = cur.getNext(cur.findDatanode(dn));
-      if(next != null) {
-        nextPrev = next.getPrevious(next.findDatanode(dn));
-        if(cur != nextPrev) {
-          System.out.println("Inconsistent list: cur->next->prev != cur");
-          return false;
-        }
-      }
-      cur = next;
-      count++;
-    }
-    return true;
-  }
-
   /**
    * BlockInfo represents a block that is not being constructed.
    * In order to start modifying the block, the BlockInfo should be converted

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Aug  8 10:06:45 2011
@@ -46,13 +46,15 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@@ -60,8 +62,9 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 
 /**
@@ -81,18 +84,13 @@ public class BlockManager {
   private volatile long pendingReplicationBlocksCount = 0L;
   private volatile long corruptReplicaBlocksCount = 0L;
   private volatile long underReplicatedBlocksCount = 0L;
-  public volatile long scheduledReplicationBlocksCount = 0L;
+  private volatile long scheduledReplicationBlocksCount = 0L;
   private volatile long excessBlocksCount = 0L;
   private volatile long pendingDeletionBlocksCount = 0L;
   private boolean isBlockTokenEnabled;
   private long blockKeyUpdateInterval;
   private long blockTokenLifetime;
   private BlockTokenSecretManager blockTokenSecretManager;
-  
-  /** returns the isBlockTokenEnabled - true if block token enabled ,else false */
-  public boolean isBlockTokenEnabled() {
-    return isBlockTokenEnabled;
-  }
 
   /** get the BlockTokenSecretManager */
   public BlockTokenSecretManager getBlockTokenSecretManager() {
@@ -131,7 +129,7 @@ public class BlockManager {
    * Mapping: Block -> { INode, datanodes, self ref }
    * Updated only in response to client-sent information.
    */
-  public final BlocksMap blocksMap;
+  final BlocksMap blocksMap;
 
   private final DatanodeManager datanodeManager;
   private final HeartbeatManager heartbeatManager;
@@ -168,13 +166,13 @@ public class BlockManager {
   private final PendingReplicationBlocks pendingReplications;
 
   /** The maximum number of replicas allowed for a block */
-  public final int maxReplication;
+  public final short maxReplication;
   /** The maximum number of outgoing replication streams
    *  a given node should have at one time 
    */
   int maxReplicationStreams;
   /** Minimum copies needed or else write is disallowed */
-  public final int minReplication;
+  public final short minReplication;
   /** Default number of replicas */
   public final int defaultReplication;
   /** The maximum number of entries returned by getCorruptInodes() */
@@ -189,30 +187,6 @@ public class BlockManager {
   /** for block replicas placement */
   private BlockPlacementPolicy blockplacement;
   
-  /**
-   * Get access keys
-   * 
-   * @return current access keys
-   */
-  public ExportedBlockKeys getBlockKeys() {
-    return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
-        : ExportedBlockKeys.DUMMY_KEYS;
-  }
-  
-  /** Generate block token for a LocatedBlock. */
-  public void setBlockToken(LocatedBlock l) throws IOException {
-    Token<BlockTokenIdentifier> token = blockTokenSecretManager.generateToken(l
-        .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
-    l.setBlockToken(token);
-  }
-
-  /** Generate block tokens for the blocks to be returned. */
-  public void setBlockTokens(List<LocatedBlock> locatedBlocks) throws IOException {
-    for(LocatedBlock l : locatedBlocks) {
-      setBlockToken(l);
-    }
-  }
-  
   public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
     namesystem = fsn;
     datanodeManager = new DatanodeManager(this, fsn, conf);
@@ -249,25 +223,28 @@ public class BlockManager {
       DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
     this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
                                           DFSConfigKeys.DFS_REPLICATION_DEFAULT);
-    this.maxReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, 
-                                      DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
-    this.minReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
-                                      DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
-    if (minReplication <= 0)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.namenode.replication.min = "
-                            + minReplication
-                            + " must be greater than 0");
-    if (maxReplication >= (int)Short.MAX_VALUE)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.replication.max = "
-                            + maxReplication + " must be less than " + (Short.MAX_VALUE));
-    if (maxReplication < minReplication)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.namenode.replication.min = "
-                            + minReplication
-                            + " must be less than dfs.replication.max = "
-                            + maxReplication);
+
+    final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, 
+                                 DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
+    final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
+                                 DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
+    if (minR <= 0)
+      throw new IOException("Unexpected configuration parameters: "
+          + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+          + " = " + minR + " <= 0");
+    if (maxR > Short.MAX_VALUE)
+      throw new IOException("Unexpected configuration parameters: "
+          + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+          + " = " + maxR + " > " + Short.MAX_VALUE);
+    if (minR > maxR)
+      throw new IOException("Unexpected configuration parameters: "
+          + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+          + " = " + minR + " > "
+          + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+          + " = " + maxR);
+    this.minReplication = (short)minR;
+    this.maxReplication = (short)maxR;
+
     this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
                                              DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
     this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
@@ -517,7 +494,7 @@ public class BlockManager {
     }
 
     long fileLength = fileINode.computeContentSummary().getLength();
-    return getBlockLocation(ucBlock, fileLength - ucBlock.getNumBytes());
+    return createLocatedBlock(ucBlock, fileLength - ucBlock.getNumBytes());
   }
 
   /**
@@ -537,8 +514,9 @@ public class BlockManager {
     return machineSet;
   }
 
-  public List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
-      long length, int nrBlocksToReturn) throws IOException {
+  private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
+      final long offset, final long length, final int nrBlocksToReturn
+      ) throws IOException {
     int curBlk = 0;
     long curPos = 0, blkSize = 0;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
@@ -557,7 +535,7 @@ public class BlockManager {
     long endOff = offset + length;
     List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
     do {
-      results.add(getBlockLocation(blocks[curBlk], curPos));
+      results.add(createLocatedBlock(blocks[curBlk], curPos));
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
@@ -567,7 +545,7 @@ public class BlockManager {
   }
 
   /** @return a LocatedBlock for the given block */
-  public LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
+  private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
       ) throws IOException {
     if (blk instanceof BlockInfoUnderConstruction) {
       if (blk.isComplete()) {
@@ -608,6 +586,76 @@ public class BlockManager {
     return new LocatedBlock(eb, machines, pos, isCorrupt);
   }
 
+  /** Create a LocatedBlocks. */
+  public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
+      final long fileSizeExcludeBlocksUnderConstruction,
+      final boolean isFileUnderConstruction,
+      final long offset, final long length, final boolean needBlockToken
+      ) throws IOException {
+    assert namesystem.hasReadOrWriteLock();
+    if (blocks == null) {
+      return null;
+    } else if (blocks.length == 0) {
+      return new LocatedBlocks(0, isFileUnderConstruction,
+          Collections.<LocatedBlock>emptyList(), null, false);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
+      }
+      final List<LocatedBlock> locatedblocks = createLocatedBlockList(
+          blocks, offset, length, Integer.MAX_VALUE);
+
+      final BlockInfo last = blocks[blocks.length - 1];
+      final long lastPos = last.isComplete()?
+          fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
+          : fileSizeExcludeBlocksUnderConstruction;
+      final LocatedBlock lastlb = createLocatedBlock(last, lastPos);
+
+      if (isBlockTokenEnabled && needBlockToken) {
+        for(LocatedBlock lb : locatedblocks) {
+          setBlockToken(lb, AccessMode.READ);
+        }
+        setBlockToken(lastlb, AccessMode.READ);
+      }
+      return new LocatedBlocks(
+          fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
+          locatedblocks, lastlb, last.isComplete());
+    }
+  }
+
+  /** @return current access keys. */
+  public ExportedBlockKeys getBlockKeys() {
+    return isBlockTokenEnabled? blockTokenSecretManager.exportKeys()
+        : ExportedBlockKeys.DUMMY_KEYS;
+  }
+
+  /** Generate a block token for the located block. */
+  public void setBlockToken(final LocatedBlock b,
+      final BlockTokenSecretManager.AccessMode mode) throws IOException {
+    if (isBlockTokenEnabled) {
+      b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(), 
+          EnumSet.of(mode)));
+    }    
+  }
+
+  void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
+      final DatanodeDescriptor nodeinfo) {
+    // check access key update
+    if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
+      cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
+      nodeinfo.needKeyUpdate = false;
+    }
+  }
+
+  /**
+   * Clamp the specified replication between the minimum and the maximum
+   * replication levels.
+   */
+  public short adjustReplication(short replication) {
+    return replication < minReplication? minReplication
+        : replication > maxReplication? maxReplication: replication;
+  }
+
   /**
    * Check whether the replication parameter is within the range
    * determined by system configuration.
@@ -639,7 +687,7 @@ public class BlockManager {
       final long size) throws UnregisteredNodeException {
     final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
     if (node == null) {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+      NameNode.stateChangeLog.warn("BLOCK* getBlocks: "
           + "Asking for blocks from an unrecorded node " + datanode.getName());
       throw new HadoopIllegalArgumentException(
           "Datanode " + datanode.getName() + " not found.");
@@ -711,7 +759,7 @@ public class BlockManager {
    * @param dn datanode
    * @param log true to create an entry in the log 
    */
-  void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
+  private void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
     Collection<Block> invalidateSet = recentInvalidateSets
         .get(dn.getStorageID());
     if (invalidateSet == null) {
@@ -721,7 +769,7 @@ public class BlockManager {
     if (invalidateSet.add(b)) {
       pendingDeletionBlocksCount++;
       if (log) {
-        NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
+        NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
             + b + " to " + dn.getName());
       }
     }
@@ -734,7 +782,7 @@ public class BlockManager {
    * @param b block
    * @param dn datanode
    */
-  public void addToInvalidates(Block b, DatanodeInfo dn) {
+  void addToInvalidates(Block b, DatanodeInfo dn) {
     addToInvalidates(b, dn, true);
   }
 
@@ -751,7 +799,7 @@ public class BlockManager {
       datanodes.append(node.getName()).append(" ");
     }
     if (datanodes.length() != 0) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
+      NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
           + b + " to " + datanodes.toString());
     }
   }
@@ -775,20 +823,29 @@ public class BlockManager {
     }
   }
 
-  public void findAndMarkBlockAsCorrupt(Block blk,
-                                 DatanodeInfo dn) throws IOException {
-    BlockInfo storedBlock = getStoredBlock(blk);
-    if (storedBlock == null) {
-      // Check if the replica is in the blockMap, if not
-      // ignore the request for now. This could happen when BlockScanner
-      // thread of Datanode reports bad block before Block reports are sent
-      // by the Datanode on startup
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.markBlockAsCorrupt: " +
-                                   "block " + blk + " could not be marked as " +
-                                   "corrupt as it does not exist in blocksMap");
-      return;
+  /**
+   * Mark the block belonging to datanode as corrupt
+   * @param blk Block to be marked as corrupt
+   * @param dn Datanode which holds the corrupt replica
+   */
+  public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
+      final DatanodeInfo dn) throws IOException {
+    namesystem.writeLock();
+    try {
+      final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
+      if (storedBlock == null) {
+        // Check if the replica is in the blockMap, if not
+        // ignore the request for now. This could happen when BlockScanner
+        // thread of Datanode reports bad block before Block reports are sent
+        // by the Datanode on startup
+        NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: "
+            + blk + " not found.");
+        return;
+      }
+      markBlockAsCorrupt(storedBlock, dn);
+    } finally {
+      namesystem.writeUnlock();
     }
-    markBlockAsCorrupt(storedBlock, dn);
   }
 
   private void markBlockAsCorrupt(BlockInfo storedBlock,
@@ -804,7 +861,7 @@ public class BlockManager {
 
     INodeFile inode = storedBlock.getINode();
     if (inode == null) {
-      NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
+      NameNode.stateChangeLog.info("BLOCK markBlockAsCorrupt: " +
                                    "block " + storedBlock +
                                    " could not be marked as corrupt as it" +
                                    " does not belong to any file");
@@ -831,13 +888,12 @@ public class BlockManager {
    */
   private void invalidateBlock(Block blk, DatanodeInfo dn)
       throws IOException {
-    NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
+    NameNode.stateChangeLog.info("BLOCK* invalidateBlock: "
                                  + blk + " on " + dn.getName());
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
-      throw new IOException("Cannot invalidate block " + blk +
-                            " because datanode " + dn.getName() +
-                            " does not exist.");
+      throw new IOException("Cannot invalidate block " + blk
+          + " because datanode " + dn.getName() + " does not exist.");
     }
 
     // Check how many copies we have of the block. If we have at least one
@@ -847,14 +903,12 @@ public class BlockManager {
       addToInvalidates(blk, dn);
       removeStoredBlock(blk, node);
       if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
-            + blk + " on "
-            + dn.getName() + " listed for deletion.");
+        NameNode.stateChangeLog.debug("BLOCK* invalidateBlocks: "
+            + blk + " on " + dn.getName() + " listed for deletion.");
       }
     } else {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
-          + blk + " on " + dn.getName()
-          + " is the only copy and was not deleted.");
+      NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + blk + " on "
+          + dn.getName() + " is the only copy and was not deleted.");
     }
   }
 
@@ -1286,20 +1340,51 @@ public class BlockManager {
   }
 
   /**
-   * The given node is reporting all its blocks.  Use this info to
-   * update the (datanode-->blocklist) and (block-->nodelist) tables.
+   * The given datanode is reporting all its blocks.
+   * Update the (machine-->blocklist) and (block-->machinelist) maps.
    */
-  public void processReport(DatanodeDescriptor node, BlockListAsLongs report) 
-  throws IOException {
-    
-    boolean isFirstBlockReport = (node.numBlocks() == 0);
-    if (isFirstBlockReport) {
-      // Initial block reports can be processed a lot more efficiently than
-      // ordinary block reports.  This shortens NN restart times.
-      processFirstBlockReport(node, report);
-      return;
-    } 
+  public void processReport(final DatanodeID nodeID, final String poolId,
+      final BlockListAsLongs newReport) throws IOException {
+    namesystem.writeLock();
+    final long startTime = Util.now(); //after acquiring write lock
+    final long endTime;
+    try {
+      final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        throw new IOException("ProcessReport from dead or unregistered node: "
+                              + nodeID.getName());
+      }
+
+      // To minimize startup time, we discard any second (or later) block reports
+      // that we receive while still in startup phase.
+      if (namesystem.isInStartupSafeMode() && node.numBlocks() > 0) {
+        NameNode.stateChangeLog.info("BLOCK* processReport: "
+            + "discarded non-initial block report from " + nodeID.getName()
+            + " because namenode still in startup phase");
+        return;
+      }
+
+      if (node.numBlocks() == 0) {
+        // The first block report can be processed a lot more efficiently than
+        // ordinary block reports.  This shortens restart times.
+        processFirstBlockReport(node, newReport);
+      } else {
+        processReport(node, newReport);
+      }
+    } finally {
+      endTime = Util.now();
+      namesystem.writeUnlock();
+    }
 
+    // Log the block report processing stats from Namenode perspective
+    NameNode.getNameNodeMetrics().addBlockReport((int) (endTime - startTime));
+    NameNode.stateChangeLog.info("BLOCK* processReport: from "
+        + nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
+        + ", processing time: " + (endTime - startTime) + " msecs");
+  }
+
+  private void processReport(final DatanodeDescriptor node,
+      final BlockListAsLongs report) throws IOException {
     // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
@@ -1322,7 +1407,7 @@ public class BlockManager {
       addStoredBlock(b, node, null, true);
     }
     for (Block b : toInvalidate) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block "
+      NameNode.stateChangeLog.info("BLOCK* processReport: block "
           + b + " on " + node.getName() + " size " + b.getNumBytes()
           + " does not belong to any file.");
       addToInvalidates(b, node);
@@ -1343,8 +1428,8 @@ public class BlockManager {
    * @param report - the initial block report, to be processed
    * @throws IOException 
    */
-  void processFirstBlockReport(DatanodeDescriptor node, BlockListAsLongs report) 
-  throws IOException {
+  private void processFirstBlockReport(final DatanodeDescriptor node,
+      final BlockListAsLongs report) throws IOException {
     if (report == null) return;
     assert (namesystem.hasWriteLock());
     assert (node.numBlocks() == 0);
@@ -1441,12 +1526,12 @@ public class BlockManager {
    * @param toUC replicas of blocks currently under construction
    * @return
    */
-  BlockInfo processReportedBlock(DatanodeDescriptor dn, 
-      Block block, ReplicaState reportedState, 
-      Collection<BlockInfo> toAdd, 
-      Collection<Block> toInvalidate, 
-      Collection<BlockInfo> toCorrupt,
-      Collection<StatefulBlockInfo> toUC) {
+  private BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
+      final Block block, final ReplicaState reportedState, 
+      final Collection<BlockInfo> toAdd, 
+      final Collection<Block> toInvalidate, 
+      final Collection<BlockInfo> toCorrupt,
+      final Collection<StatefulBlockInfo> toUC) {
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("Reported block " + block
@@ -1616,11 +1701,9 @@ public class BlockManager {
     }
     if (storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
-                                   + "addStoredBlock request received for "
-                                   + block + " on " + node.getName()
-                                   + " size " + block.getNumBytes()
-                                   + " But it does not belong to any file.");
+      NameNode.stateChangeLog.info("BLOCK* addStoredBlock: " + block + " on "
+          + node.getName() + " size " + block.getNumBytes()
+          + " but it does not belong to any file.");
       // we could add this block to invalidate set of this datanode.
       // it will happen in next block report otherwise.
       return block;
@@ -1636,13 +1719,13 @@ public class BlockManager {
     if (added) {
       curReplicaDelta = 1;
       if (logEveryBlock) {
-        NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
+        NameNode.stateChangeLog.info("BLOCK* addStoredBlock: "
             + "blockMap updated: " + node.getName() + " is added to " + 
             storedBlock + " size " + storedBlock.getNumBytes());
       }
     } else {
       curReplicaDelta = 0;
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
+      NameNode.stateChangeLog.warn("BLOCK* addStoredBlock: "
           + "Redundant addStoredBlock request received for " + storedBlock
           + " on " + node.getName() + " size " + storedBlock.getNumBytes());
     }
@@ -1778,13 +1861,39 @@ public class BlockManager {
     LOG.info("Number of  over-replicated blocks = " + nrOverReplicated);
   }
 
+  /** Set replication for the blocks. */
+  public void setReplication(final short oldRepl, final short newRepl,
+      final String src, final Block... blocks) throws IOException {
+    if (newRepl == oldRepl) {
+      return;
+    }
+
+    // update needReplication priority queues
+    for(Block b : blocks) {
+      updateNeededReplications(b, 0, newRepl-oldRepl);
+    }
+      
+    if (oldRepl > newRepl) {
+      // old replication > the new one; need to remove copies
+      LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
+          + " for " + src);
+      for(Block b : blocks) {
+        processOverReplicatedBlock(b, newRepl, null, null);
+      }
+    } else { // replication factor is increased
+      LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
+          + " for " + src);
+    }
+  }
+
   /**
    * Find how many of the containing nodes are "extra", if any.
    * If there are any extras, call chooseExcessReplicates() to
    * mark them in the excessReplicateMap.
    */
-  public void processOverReplicatedBlock(Block block, short replication,
-      DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
+  private void processOverReplicatedBlock(final Block block,
+      final short replication, final DatanodeDescriptor addedNode,
+      DatanodeDescriptor delNodeHint) {
     assert namesystem.hasWriteLock();
     if (addedNode == delNodeHint) {
       delNodeHint = null;
@@ -1806,12 +1915,112 @@ public class BlockManager {
         }
       }
     }
-    namesystem.chooseExcessReplicates(nonExcess, block, replication, 
+    chooseExcessReplicates(nonExcess, block, replication, 
         addedNode, delNodeHint, blockplacement);
   }
 
 
-  public void addToExcessReplicate(DatanodeInfo dn, Block block) {
+  /**
+   * We want "replication" replicates for the block, but we now have too many.  
+   * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
+   *
+   * srcNodes.size() - dstNodes.size() == replication
+   *
+   * We pick node that make sure that replicas are spread across racks and
+   * also try hard to pick one with least free space.
+   * The algorithm is first to pick a node with least free space from nodes
+   * that are on a rack holding more than one replicas of the block.
+   * So removing such a replica won't remove a rack. 
+   * If no such a node is available,
+   * then pick a node with least free space
+   */
+  private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
+                              Block b, short replication,
+                              DatanodeDescriptor addedNode,
+                              DatanodeDescriptor delNodeHint,
+                              BlockPlacementPolicy replicator) {
+    assert namesystem.hasWriteLock();
+    // first form a rack to datanodes map and
+    INodeFile inode = getINode(b);
+    final Map<String, List<DatanodeDescriptor>> rackMap
+        = new HashMap<String, List<DatanodeDescriptor>>();
+    for(final Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
+        iter.hasNext(); ) {
+      final DatanodeDescriptor node = iter.next();
+      final String rackName = node.getNetworkLocation();
+      List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
+      if (datanodeList == null) {
+        datanodeList = new ArrayList<DatanodeDescriptor>();
+        rackMap.put(rackName, datanodeList);
+      }
+      datanodeList.add(node);
+    }
+    
+    // split nodes into two sets
+    // priSet contains nodes on rack with more than one replica
+    // remains contains the remaining nodes
+    final List<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
+    for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
+      if (datanodeList.size() == 1 ) {
+        remains.add(datanodeList.get(0));
+      } else {
+        priSet.addAll(datanodeList);
+      }
+    }
+    
+    // pick one node to delete that favors the delete hint
+    // otherwise pick one with least space from priSet if it is not empty
+    // otherwise one node with least space from remains
+    boolean firstOne = true;
+    while (nonExcess.size() - replication > 0) {
+      // check if we can delete delNodeHint
+      final DatanodeInfo cur;
+      if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
+          && (priSet.contains(delNodeHint)
+              || (addedNode != null && !priSet.contains(addedNode))) ) {
+        cur = delNodeHint;
+      } else { // regular excessive replica removal
+        cur = replicator.chooseReplicaToDelete(inode, b, replication,
+            priSet, remains);
+      }
+      firstOne = false;
+
+      // adjust rackmap, priSet, and remains
+      String rack = cur.getNetworkLocation();
+      final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
+      datanodes.remove(cur);
+      if (datanodes.isEmpty()) {
+        rackMap.remove(rack);
+      }
+      if (priSet.remove(cur)) {
+        if (datanodes.size() == 1) {
+          priSet.remove(datanodes.get(0));
+          remains.add(datanodes.get(0));
+        }
+      } else {
+        remains.remove(cur);
+      }
+
+      nonExcess.remove(cur);
+      addToExcessReplicate(cur, b);
+
+      //
+      // The 'excessblocks' tracks blocks until we get confirmation
+      // that the datanode has deleted them; the only way we remove them
+      // is when we get a "removeBlock" message.  
+      //
+      // The 'invalidate' list is used to inform the datanode the block 
+      // should be deleted.  Items are removed from the invalidate list
+      // upon giving instructions to the namenode.
+      //
+      addToInvalidates(b, cur);
+      NameNode.stateChangeLog.info("BLOCK* chooseExcessReplicates: "
+                +"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
+    }
+  }
+
+  private void addToExcessReplicate(DatanodeInfo dn, Block block) {
     assert namesystem.hasWriteLock();
     Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
     if (excessBlocks == null) {
@@ -1821,7 +2030,7 @@ public class BlockManager {
     if (excessBlocks.add(block)) {
       excessBlocksCount++;
       if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates:"
+        NameNode.stateChangeLog.debug("BLOCK* addToExcessReplicate:"
             + " (" + dn.getName() + ", " + block
             + ") is added to excessReplicateMap");
       }
@@ -1834,14 +2043,14 @@ public class BlockManager {
    */
   private void removeStoredBlock(Block block, DatanodeDescriptor node) {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+      NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
           + block + " from " + node.getName());
     }
     assert (namesystem.hasWriteLock());
     {
       if (!blocksMap.removeNode(block, node)) {
         if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+          NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
               + block + " has already been removed from node " + node);
         }
         return;
@@ -1869,8 +2078,7 @@ public class BlockManager {
         if (excessBlocks.remove(block)) {
           excessBlocksCount--;
           if(NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug(
-                "BLOCK* NameSystem.removeStoredBlock: "
+            NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
                 + block + " is removed from excessBlocks");
           }
           if (excessBlocks.size() == 0) {
@@ -1902,7 +2110,7 @@ public class BlockManager {
   /**
    * The given node is reporting that it received a certain block.
    */
-  public void addBlock(DatanodeDescriptor node, Block block, String delHint)
+  private void addBlock(DatanodeDescriptor node, Block block, String delHint)
       throws IOException {
     // decrement number of blocks scheduled to this datanode.
     node.decBlocksScheduled();
@@ -1912,9 +2120,8 @@ public class BlockManager {
     if (delHint != null && delHint.length() != 0) {
       delHintNode = datanodeManager.getDatanode(delHint);
       if (delHintNode == null) {
-        NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
-            + block + " is expected to be removed from an unrecorded node "
-            + delHint);
+        NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + block
+            + " is expected to be removed from an unrecorded node " + delHint);
       }
     }
 
@@ -1942,7 +2149,7 @@ public class BlockManager {
       addStoredBlock(b, node, delHintNode, true);
     }
     for (Block b : toInvalidate) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
+      NameNode.stateChangeLog.info("BLOCK* addBlock: block "
           + b + " on " + node.getName() + " size " + b.getNumBytes()
           + " does not belong to any file.");
       addToInvalidates(b, node);
@@ -1952,6 +2159,30 @@ public class BlockManager {
     }
   }
 
+  /** The given node is reporting that it received a certain block. */
+  public void blockReceived(final DatanodeID nodeID, final String poolId,
+      final Block block, final String delHint) throws IOException {
+    namesystem.writeLock();
+    try {
+      final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        final String s = block + " is received from dead or unregistered node "
+            + nodeID.getName();
+        NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + s);
+        throw new IOException(s);
+      } 
+
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("BLOCK* blockReceived: " + block
+            + " is received from " + nodeID.getName());
+      }
+
+      addBlock(node, block, delHint);
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
   /**
    * Return the number of nodes that are live and decommissioned.
    */
@@ -2142,9 +2373,9 @@ public class BlockManager {
     return b;
   }
 
-  /* updates a block in under replication queue */
-  public void updateNeededReplications(Block block, int curReplicasDelta,
-      int expectedReplicasDelta) {
+  /** updates a block in under replication queue */
+  private void updateNeededReplications(final Block block,
+      final int curReplicasDelta, int expectedReplicasDelta) {
     namesystem.writeLock();
     try {
       NumberReplicas repl = countNodes(block);
@@ -2303,8 +2534,9 @@ public class BlockManager {
     return blocksMap.getINode(b);
   }
 
-  public void removeFromCorruptReplicasMap(Block block) {
-    corruptReplicas.removeFromCorruptReplicasMap(block);
+  /** @return an iterator of the datanodes. */
+  public Iterator<DatanodeDescriptor> datanodeIterator(final Block block) {
+    return blocksMap.nodeIterator(block);
   }
 
   public int numCorruptReplicas(Block block) {
@@ -2313,6 +2545,8 @@ public class BlockManager {
 
   public void removeBlockFromMap(Block block) {
     blocksMap.removeBlock(block);
+    // If block is removed from blocksMap remove it from corruptReplicasMap
+    corruptReplicas.removeFromCorruptReplicasMap(block);
   }
 
   public int getCapacity() {

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Mon Aug  8 10:06:45 2011
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdfs.util.Light
  * block's metadata currently includes INode it belongs to and
  * the datanodes that store the block.
  */
-public class BlocksMap {
+class BlocksMap {
   private static class NodeIterator implements Iterator<DatanodeDescriptor> {
     private BlockInfo blockInfo;
     private int nextIdx = 0;
@@ -101,7 +101,7 @@ public class BlocksMap {
   /**
    * Add block b belonging to the specified file inode to the map.
    */
-  public BlockInfo addINode(BlockInfo b, INodeFile iNode) {
+  BlockInfo addINode(BlockInfo b, INodeFile iNode) {
     BlockInfo info = blocks.get(b);
     if (info != b) {
       info = b;
@@ -137,7 +137,7 @@ public class BlocksMap {
    * Searches for the block in the BlocksMap and 
    * returns Iterator that iterates through the nodes the block belongs to.
    */
-  public Iterator<DatanodeDescriptor> nodeIterator(Block b) {
+  Iterator<DatanodeDescriptor> nodeIterator(Block b) {
     return nodeIterator(blocks.get(b));
   }
 
@@ -182,27 +182,6 @@ public class BlocksMap {
   Iterable<BlockInfo> getBlocks() {
     return blocks;
   }
-
-  /**
-   * Check if the block exists in map
-   */
-  public boolean contains(Block block) {
-    return blocks.contains(block);
-  }
-  
-  /**
-   * Check if the replica at the given datanode exists in map
-   */
-  boolean contains(Block block, DatanodeDescriptor datanode) {
-    BlockInfo info = blocks.get(block);
-    if (info == null)
-      return false;
-    
-    if (-1 == info.findDatanode(datanode))
-      return false;
-    
-    return true;
-  }
   
   /** Get the capacity of the HashMap that stores blocks */
   int getCapacity() {

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Mon Aug  8 10:06:45 2011
@@ -34,6 +34,7 @@ import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -405,7 +406,7 @@ public class DatanodeManager {
    * @param nodeList
    *          , array list of live or dead nodes.
    */
-  public void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
+  private void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
     // If the include list is empty, any nodes are welcomed and it does not
     // make sense to exclude any nodes from the cluster. Therefore, no remove.
     if (hostsReader.getHosts().isEmpty()) {
@@ -563,7 +564,7 @@ public class DatanodeManager {
                                       nodeReg.getInfoPort(),
                                       nodeReg.getIpcPort());
     nodeReg.updateRegInfo(dnReg);
-    nodeReg.exportedKeys = namesystem.getBlockManager().getBlockKeys();
+    nodeReg.exportedKeys = blockManager.getBlockKeys();
       
     NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
         + "node registration from " + nodeReg.getName()
@@ -710,16 +711,59 @@ public class DatanodeManager {
     return numDead;
   }
 
+  /** @return list of datanodes where decommissioning is in progress. */
+  public List<DatanodeDescriptor> getDecommissioningNodes() {
+    namesystem.readLock();
+    try {
+      final List<DatanodeDescriptor> decommissioningNodes
+          = new ArrayList<DatanodeDescriptor>();
+      final List<DatanodeDescriptor> results = getDatanodeListForReport(
+          DatanodeReportType.LIVE);
+      for(DatanodeDescriptor node : results) {
+        if (node.isDecommissionInProgress()) {
+          decommissioningNodes.add(node);
+        }
+      }
+      return decommissioningNodes;
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+
+
   /** Fetch live and dead datanodes. */
-  public void fetchDatanodess(final List<DatanodeDescriptor> live, 
-      final List<DatanodeDescriptor> dead) {
-    final List<DatanodeDescriptor> results =
-        getDatanodeListForReport(DatanodeReportType.ALL);    
-    for(DatanodeDescriptor node : results) {
-      if (isDatanodeDead(node))
-        dead.add(node);
-      else
-        live.add(node);
+  public void fetchDatanodes(final List<DatanodeDescriptor> live, 
+      final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {
+    if (live == null && dead == null) {
+      throw new HadoopIllegalArgumentException("Both live and dead lists are null");
+    }
+
+    namesystem.readLock();
+    try {
+      final List<DatanodeDescriptor> results =
+          getDatanodeListForReport(DatanodeReportType.ALL);    
+      for(DatanodeDescriptor node : results) {
+        if (isDatanodeDead(node)) {
+          if (dead != null) {
+            dead.add(node);
+          }
+        } else {
+          if (live != null) {
+            live.add(node);
+          }
+        }
+      }
+    } finally {
+      namesystem.readUnlock();
+    }
+    
+    if (removeDecommissionNode) {
+      if (live != null) {
+        removeDecomNodeFromList(live);
+      }
+      if (dead != null) {
+        removeDecomNodeFromList(dead);
+      }
     }
   }
 
@@ -847,7 +891,7 @@ public class DatanodeManager {
               blockPoolId, blks));
         }
         
-        namesystem.addKeyUpdateCommand(cmds, nodeinfo);
+        blockManager.addKeyUpdateCommand(cmds, nodeinfo);
 
         // check for balancer bandwidth update
         if (nodeinfo.getBalancerBandwidth() > 0) {

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Mon Aug  8 10:06:45 2011
@@ -26,11 +26,11 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.URL;
 import java.net.URLEncoder;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.TreeSet;
 
 import javax.servlet.ServletContext;
@@ -249,7 +249,7 @@ public class JspHelper {
     out.print("</tbody></table>");
   }
 
-  public static void sortNodeList(ArrayList<DatanodeDescriptor> nodes,
+  public static void sortNodeList(final List<DatanodeDescriptor> nodes,
                            String field, String order) {
         
     class NodeComapare implements Comparator<DatanodeDescriptor> {

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Aug  8 10:06:45 2011
@@ -444,8 +444,6 @@ public class FSDirectory implements Clos
       // modify file-> block and blocksMap
       fileNode.removeLastBlock(block);
       getBlockManager().removeBlockFromMap(block);
-      // If block is removed from blocksMap remove it from corruptReplicasMap
-      getBlockManager().removeFromCorruptReplicasMap(block);
 
       // write modified block locations to log
       fsImage.getEditLog().logOpenFile(path, fileNode);
@@ -809,7 +807,7 @@ public class FSDirectory implements Clos
    * @return array of file blocks
    * @throws QuotaExceededException
    */
-  Block[] setReplication(String src, short replication, int[] oldReplication)
+  Block[] setReplication(String src, short replication, short[] oldReplication)
       throws QuotaExceededException, UnresolvedLinkException {
     waitForReady();
     Block[] fileBlocks = null;
@@ -826,14 +824,10 @@ public class FSDirectory implements Clos
 
   Block[] unprotectedSetReplication(String src, 
                                     short replication,
-                                    int[] oldReplication
+                                    short[] oldReplication
                                     ) throws QuotaExceededException, 
                                     UnresolvedLinkException {
     assert hasWriteLock();
-    if (oldReplication == null) {
-      oldReplication = new int[1];
-    }
-    oldReplication[0] = -1;
 
     INode[] inodes = rootDir.getExistingPathINodes(src, true);
     INode inode = inodes[inodes.length - 1];
@@ -845,14 +839,17 @@ public class FSDirectory implements Clos
       return null;
     }
     INodeFile fileNode = (INodeFile)inode;
-    oldReplication[0] = fileNode.getReplication();
+    final short oldRepl = fileNode.getReplication();
 
     // check disk quota
-    long dsDelta = (replication - oldReplication[0]) *
-         (fileNode.diskspaceConsumed()/oldReplication[0]);
+    long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
     updateCount(inodes, inodes.length-1, 0, dsDelta, true);
 
     fileNode.setReplication(replication);
+
+    if (oldReplication != null) {
+      oldReplication[0] = oldRepl;
+    }
     return fileNode.getBlocks();
   }
 
@@ -2075,8 +2072,9 @@ public class FSDirectory implements Clos
         size = fileNode.computeFileSize(true);
         replication = fileNode.getReplication();
         blocksize = fileNode.getPreferredBlockSize();
-        loc = getFSNamesystem().getBlockLocationsInternal(
-            fileNode, 0L, size, false);
+        loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
+            fileNode.getBlocks(), fileNode.computeFileSize(false),
+            fileNode.isUnderConstruction(), 0L, size, false);
         if (loc==null) {
           loc = new LocatedBlocks();
         }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Mon Aug  8 10:06:45 2011
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.na
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.FilterInputStream;
 import java.io.IOException;
@@ -144,8 +143,8 @@ public class FSEditLogLoader {
 
             // versions > 0 support per file replication
             // get name and replication
-            short replication
-              = fsNamesys.adjustReplication(addCloseOp.replication);
+            final short replication  = fsNamesys.getBlockManager(
+                ).adjustReplication(addCloseOp.replication);
 
             long blockSize = addCloseOp.blockSize;
             BlockInfo blocks[] = new BlockInfo[addCloseOp.blocks.length];
@@ -218,8 +217,8 @@ public class FSEditLogLoader {
           }
           case OP_SET_REPLICATION: {
             SetReplicationOp setReplicationOp = (SetReplicationOp)op;
-            short replication
-              = fsNamesys.adjustReplication(setReplicationOp.replication);
+            short replication = fsNamesys.getBlockManager().adjustReplication(
+                setReplicationOp.replication);
             fsDir.unprotectedSetReplication(setReplicationOp.path,
                                             replication, null);
             break;

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Mon Aug  8 10:06:45 2011
@@ -330,7 +330,7 @@ class FSImageFormat {
     
     int imgVersion = getLayoutVersion();
     short replication = in.readShort();
-    replication = namesystem.adjustReplication(replication);
+    replication = namesystem.getBlockManager().adjustReplication(replication);
     modificationTime = in.readLong();
     if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, imgVersion)) {
       atime = in.readLong();

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Aug  8 10:06:45 2011
@@ -39,10 +39,8 @@ import java.util.Date;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -74,7 +72,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -88,12 +85,12 @@ import org.apache.hadoop.hdfs.protocol.Q
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
@@ -110,11 +107,9 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
@@ -394,7 +389,7 @@ public class FSNamesystem implements FSC
     return this.fsLock.getReadHoldCount() > 0;
   }
 
-  boolean hasReadOrWriteLock() {
+  public boolean hasReadOrWriteLock() {
     return hasReadLock() || hasWriteLock();
   }
 
@@ -534,14 +529,12 @@ public class FSNamesystem implements FSC
   
       long totalInodes = this.dir.totalInodes();
       long totalBlocks = this.getBlocksTotal();
-  
-      ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
-      ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-      this.DFSNodesStatus(live, dead);
-      
-      String str = totalInodes + " files and directories, " + totalBlocks
-          + " blocks = " + (totalInodes + totalBlocks) + " total";
-      out.println(str);
+      out.println(totalInodes + " files and directories, " + totalBlocks
+          + " blocks = " + (totalInodes + totalBlocks) + " total");
+
+      final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+      final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+      blockManager.getDatanodeManager().fetchDatanodes(live, dead, false);
       out.println("Live Datanodes: "+live.size());
       out.println("Dead Datanodes: "+dead.size());
       blockManager.metaSave(out);
@@ -750,7 +743,9 @@ public class FSNamesystem implements FSC
           }
           dir.setTimes(src, inode, -1, now, false);
         }
-        return getBlockLocationsInternal(inode, offset, length, needBlockToken);
+        return blockManager.createLocatedBlocks(inode.getBlocks(),
+            inode.computeFileSize(false), inode.isUnderConstruction(),
+            offset, length, needBlockToken);
       } finally {
         if (attempt == 0) {
           readUnlock();
@@ -761,44 +756,6 @@ public class FSNamesystem implements FSC
     }
     return null; // can never reach here
   }
-  
-  LocatedBlocks getBlockLocationsInternal(INodeFile inode,
-      long offset, long length, boolean needBlockToken)
-  throws IOException {
-    assert hasReadOrWriteLock();
-    final BlockInfo[] blocks = inode.getBlocks();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
-    }
-    if (blocks == null) {
-      return null;
-    }
-
-    if (blocks.length == 0) {
-      return new LocatedBlocks(0, inode.isUnderConstruction(),
-          Collections.<LocatedBlock>emptyList(), null, false);
-    } else {
-      final long n = inode.computeFileSize(false);
-      final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
-          blocks, offset, length, Integer.MAX_VALUE);
-      final BlockInfo last = inode.getLastBlock();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("last = " + last);
-      }
-      
-      LocatedBlock lastBlock = last.isComplete() ? blockManager
-          .getBlockLocation(last, n - last.getNumBytes()) : blockManager
-          .getBlockLocation(last, n);
-          
-      if (blockManager.isBlockTokenEnabled() && needBlockToken) {
-        blockManager.setBlockTokens(locatedblocks);
-        blockManager.setBlockToken(lastBlock);
-      }
-      return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
-          lastBlock, last.isComplete());
-    }
-  }
-  
 
   /**
    * Moves all the blocks from srcs and appends them to trg
@@ -960,7 +917,7 @@ public class FSNamesystem implements FSC
    * The access time is precise upto an hour. The transaction, if needed, is
    * written to the edits log but is not flushed.
    */
-  public void setTimes(String src, long mtime, long atime) 
+  void setTimes(String src, long mtime, long atime) 
     throws IOException, UnresolvedLinkException {
     if (!isAccessTimeSupported() && atime != -1) {
       throw new IOException("Access time for hdfs is not configured. " +
@@ -1060,60 +1017,37 @@ public class FSNamesystem implements FSC
    * @return true if successful; 
    *         false if file does not exist or is a directory
    */
-  public boolean setReplication(String src, short replication) 
-    throws IOException, UnresolvedLinkException {
-    boolean status = false;
+  boolean setReplication(final String src, final short replication
+      ) throws IOException {
+    blockManager.verifyReplication(src, replication, null);
+
+    final boolean isFile;
     writeLock();
     try {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set replication for " + src, safeMode);
       }
-      status = setReplicationInternal(src, replication);
+      if (isPermissionEnabled) {
+        checkPathAccess(src, FsAction.WRITE);
+      }
+
+      final short[] oldReplication = new short[1];
+      final Block[] blocks = dir.setReplication(src, replication, oldReplication);
+      isFile = blocks != null;
+      if (isFile) {
+        blockManager.setReplication(oldReplication[0], replication, src, blocks);
+      }
     } finally {
       writeUnlock();
     }
+
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isFile && auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     "setReplication", src, null, null);
     }
-    return status;
-  }
-
-  private boolean setReplicationInternal(String src,
-      short replication) throws AccessControlException, QuotaExceededException,
-      SafeModeException, UnresolvedLinkException, IOException {
-    assert hasWriteLock();
-    blockManager.verifyReplication(src, replication, null);
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
-    }
-
-    int[] oldReplication = new int[1];
-    Block[] fileBlocks;
-    fileBlocks = dir.setReplication(src, replication, oldReplication);
-    if (fileBlocks == null)  // file not found or is a directory
-      return false;
-    int oldRepl = oldReplication[0];
-    if (oldRepl == replication) // the same replication
-      return true;
-
-    // update needReplication priority queues
-    for(int idx = 0; idx < fileBlocks.length; idx++)
-      blockManager.updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
-      
-    if (oldRepl > replication) {  
-      // old replication > the new one; need to remove copies
-      LOG.info("Reducing replication for file " + src 
-               + ". New replication is " + replication);
-      for(int idx = 0; idx < fileBlocks.length; idx++)
-        blockManager.processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
-    } else { // replication factor is increased
-      LOG.info("Increasing replication for file " + src 
-          + ". New replication is " + replication);
-    }
-    return true;
+    return isFile;
   }
     
   long getPreferredBlockSize(String filename) 
@@ -1287,9 +1221,8 @@ public class FSNamesystem implements FSC
         LocatedBlock lb = 
           blockManager.convertLastBlockToUnderConstruction(cons);
 
-        if (lb != null && blockManager.isBlockTokenEnabled()) {
-          lb.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(lb.getBlock(), 
-              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
+        if (lb != null) {
+          blockManager.setBlockToken(lb, AccessMode.WRITE);
         }
         return lb;
       } else {
@@ -1456,7 +1389,7 @@ public class FSNamesystem implements FSC
     try {
       lb = startFileInternal(src, null, holder, clientMachine, 
                         EnumSet.of(CreateFlag.APPEND), 
-                        false, (short)blockManager.maxReplication, (long)0);
+                        false, blockManager.maxReplication, (long)0);
     } finally {
       writeUnlock();
     }
@@ -1577,10 +1510,7 @@ public class FSNamesystem implements FSC
 
     // Create next block
     LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
-    if (blockManager.isBlockTokenEnabled()) {
-      b.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(b.getBlock(), 
-          EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
-    }
+    blockManager.setBlockToken(b, BlockTokenSecretManager.AccessMode.WRITE);
     return b;
   }
 
@@ -1626,17 +1556,14 @@ public class FSNamesystem implements FSC
         ).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
         excludes, preferredblocksize);
     final LocatedBlock lb = new LocatedBlock(blk, targets);
-    if (blockManager.isBlockTokenEnabled()) {
-      lb.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(lb.getBlock(), 
-          EnumSet.of(BlockTokenSecretManager.AccessMode.COPY)));
-    }
+    blockManager.setBlockToken(lb, AccessMode.COPY);
     return lb;
   }
 
   /**
    * The client would like to let go of the given block
    */
-  public boolean abandonBlock(ExtendedBlock b, String src, String holder)
+  boolean abandonBlock(ExtendedBlock b, String src, String holder)
       throws LeaseExpiredException, FileNotFoundException,
       UnresolvedLinkException, IOException {
     writeLock();
@@ -1821,23 +1748,6 @@ public class FSNamesystem implements FSC
     }
   }
 
-
-  /**
-   * Mark the block belonging to datanode as corrupt
-   * @param blk Block to be marked as corrupt
-   * @param dn Datanode which holds the corrupt replica
-   */
-  public void markBlockAsCorrupt(ExtendedBlock blk, DatanodeInfo dn)
-    throws IOException {
-    writeLock();
-    try {
-      blockManager.findAndMarkBlockAsCorrupt(blk.getLocalBlock(), dn);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-
   ////////////////////////////////////////////////////////////////
   // Here's how to handle block-copy failure during client write:
   // -- As usual, the client's write should result in a streaming
@@ -2621,16 +2531,6 @@ public class FSNamesystem implements FSC
     }
   }
 
-  public void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
-      final DatanodeDescriptor nodeinfo) {
-    // check access key update
-    if (blockManager.isBlockTokenEnabled() && nodeinfo.needKeyUpdate) {
-      cmds.add(new KeyUpdateCommand(blockManager.getBlockTokenSecretManager().exportKeys()));
-      nodeinfo.needKeyUpdate = false;
-    }
-  }
-
-
   /**
    * Returns whether or not there were available resources at the last check of
    * resources.
@@ -2688,179 +2588,7 @@ public class FSNamesystem implements FSC
 
   FSEditLog getEditLog() {
     return getFSImage().getEditLog();
-  }
-    
-  /**
-   * The given node is reporting all its blocks.  Use this info to 
-   * update the (machine-->blocklist) and (block-->machinelist) tables.
-   */
-  void processReport(DatanodeID nodeID, String poolId,
-      BlockListAsLongs newReport) throws IOException {
-    long startTime, endTime;
-
-    writeLock();
-    startTime = now(); //after acquiring write lock
-    try {
-      final DatanodeDescriptor node = blockManager.getDatanodeManager(
-          ).getDatanode(nodeID);
-      if (node == null || !node.isAlive) {
-        throw new IOException("ProcessReport from dead or unregistered node: "
-                              + nodeID.getName());
-      }
-      // To minimize startup time, we discard any second (or later) block reports
-      // that we receive while still in startup phase.
-      if (isInStartupSafeMode() && node.numBlocks() > 0) {
-        NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
-            + "discarded non-initial block report from " + nodeID.getName()
-            + " because namenode still in startup phase");
-        return;
-      }
-  
-      blockManager.processReport(node, newReport);
-    } finally {
-      endTime = now();
-      writeUnlock();
-    }
-
-    // Log the block report processing stats from Namenode perspective
-    NameNode.getNameNodeMetrics().addBlockReport((int) (endTime - startTime));
-    NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: from "
-        + nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
-        + ", processing time: " + (endTime - startTime) + " msecs");
-  }
-
-  /**
-   * We want "replication" replicates for the block, but we now have too many.  
-   * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
-   *
-   * srcNodes.size() - dstNodes.size() == replication
-   *
-   * We pick node that make sure that replicas are spread across racks and
-   * also try hard to pick one with least free space.
-   * The algorithm is first to pick a node with least free space from nodes
-   * that are on a rack holding more than one replicas of the block.
-   * So removing such a replica won't remove a rack. 
-   * If no such a node is available,
-   * then pick a node with least free space
-   */
-  public void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
-                              Block b, short replication,
-                              DatanodeDescriptor addedNode,
-                              DatanodeDescriptor delNodeHint,
-                              BlockPlacementPolicy replicator) {
-    assert hasWriteLock();
-    // first form a rack to datanodes map and
-    INodeFile inode = blockManager.getINode(b);
-    HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
-      new HashMap<String, ArrayList<DatanodeDescriptor>>();
-    for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
-         iter.hasNext();) {
-      DatanodeDescriptor node = iter.next();
-      String rackName = node.getNetworkLocation();
-      ArrayList<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
-      if(datanodeList==null) {
-        datanodeList = new ArrayList<DatanodeDescriptor>();
-      }
-      datanodeList.add(node);
-      rackMap.put(rackName, datanodeList);
-    }
-    
-    // split nodes into two sets
-    // priSet contains nodes on rack with more than one replica
-    // remains contains the remaining nodes
-    ArrayList<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
-    ArrayList<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
-    for( Iterator<Entry<String, ArrayList<DatanodeDescriptor>>> iter = 
-      rackMap.entrySet().iterator(); iter.hasNext(); ) {
-      Entry<String, ArrayList<DatanodeDescriptor>> rackEntry = iter.next();
-      ArrayList<DatanodeDescriptor> datanodeList = rackEntry.getValue(); 
-      if( datanodeList.size() == 1 ) {
-        remains.add(datanodeList.get(0));
-      } else {
-        priSet.addAll(datanodeList);
-      }
-    }
-    
-    // pick one node to delete that favors the delete hint
-    // otherwise pick one with least space from priSet if it is not empty
-    // otherwise one node with least space from remains
-    boolean firstOne = true;
-    while (nonExcess.size() - replication > 0) {
-      DatanodeInfo cur = null;
-
-      // check if we can del delNodeHint
-      if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
-            (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
-          cur = delNodeHint;
-      } else { // regular excessive replica removal
-        cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);
-      }
-
-      firstOne = false;
-      // adjust rackmap, priSet, and remains
-      String rack = cur.getNetworkLocation();
-      ArrayList<DatanodeDescriptor> datanodes = rackMap.get(rack);
-      datanodes.remove(cur);
-      if(datanodes.isEmpty()) {
-        rackMap.remove(rack);
-      }
-      if( priSet.remove(cur) ) {
-        if (datanodes.size() == 1) {
-          priSet.remove(datanodes.get(0));
-          remains.add(datanodes.get(0));
-        }
-      } else {
-        remains.remove(cur);
-      }
-
-      nonExcess.remove(cur);
-      blockManager.addToExcessReplicate(cur, b);
-
-      //
-      // The 'excessblocks' tracks blocks until we get confirmation
-      // that the datanode has deleted them; the only way we remove them
-      // is when we get a "removeBlock" message.  
-      //
-      // The 'invalidate' list is used to inform the datanode the block 
-      // should be deleted.  Items are removed from the invalidate list
-      // upon giving instructions to the namenode.
-      //
-      blockManager.addToInvalidates(b, cur);
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.chooseExcessReplicates: "
-                +"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
-    }
-  }
-
-
-  /**
-   * The given node is reporting that it received a certain block.
-   */
-  public void blockReceived(DatanodeID nodeID,  
-                                         String poolId,
-                                         Block block,
-                                         String delHint
-                                         ) throws IOException {
-    writeLock();
-    try {
-      final DatanodeDescriptor node = blockManager.getDatanodeManager(
-          ).getDatanode(nodeID);
-      if (node == null || !node.isAlive) {
-        NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
-            + " is received from dead or unregistered node " + nodeID.getName());
-        throw new IOException(
-            "Got blockReceived message from unregistered or dead node " + block);
-      }
-          
-      if (NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
-                                      +block+" is received from " + nodeID.getName());
-      }
-  
-      blockManager.addBlock(node, block, delHint);
-    } finally {
-      writeUnlock();
-    }
-  }
+  }    
 
   private void checkBlock(ExtendedBlock block) throws IOException {
     if (block != null && !this.blockPoolId.equals(block.getBlockPoolId())) {
@@ -3009,43 +2737,10 @@ public class FSNamesystem implements FSC
     }
   }
 
-  /**
-   */
-  public void DFSNodesStatus(ArrayList<DatanodeDescriptor> live, 
-                                          ArrayList<DatanodeDescriptor> dead) {
-    readLock();
-    try {
-      getBlockManager().getDatanodeManager().fetchDatanodess(live, dead);
-    } finally {
-      readUnlock();
-    }
-  }
-
   public Date getStartTime() {
     return new Date(systemStart); 
   }
     
-  short getMaxReplication()     { return (short)blockManager.maxReplication; }
-  short getMinReplication()     { return (short)blockManager.minReplication; }
-  short getDefaultReplication() { return (short)blockManager.defaultReplication; }
-
-  /**
-   * Clamp the specified replication between the minimum and maximum
-   * replication levels for this namesystem.
-   */
-  short adjustReplication(short replication) {
-    short minReplication = getMinReplication();
-    if (replication < minReplication) {
-      replication = minReplication;
-    }
-    short maxReplication = getMaxReplication();
-    if (replication > maxReplication) {
-      replication = maxReplication;
-    }
-    return replication;
-  }
-    
-  
   /**
    * Rereads the config to get hosts and exclude list file names.
    * Rereads the files to update the hosts and exclude lists.  It
@@ -3740,10 +3435,6 @@ public class FSNamesystem implements FSC
       writeUnlock();
     }
   }
-  
-  public RemoteEditLogManifest getEditLogManifest(long sinceTxId) throws IOException {
-    return getEditLog().getEditLogManifest(sinceTxId);
-  }
 
   NamenodeCommand startCheckpoint(
                                 NamenodeRegistration bnReg, // backup node
@@ -3968,7 +3659,7 @@ public class FSNamesystem implements FSC
   /**
    * shutdown FSNamesystem
    */
-  public void shutdown() {
+  void shutdown() {
     if (mbeanName != null)
       MBeans.unregister(mbeanName);
   }
@@ -4069,10 +3760,7 @@ public class FSNamesystem implements FSC
       // get a new generation stamp and an access token
       block.setGenerationStamp(nextGenerationStamp());
       locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
-      if (blockManager.isBlockTokenEnabled()) {
-        locatedBlock.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(
-          block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
-      }
+      blockManager.setBlockToken(locatedBlock, AccessMode.WRITE);
     } finally {
       writeUnlock();
     }
@@ -4273,26 +3961,6 @@ public class FSNamesystem implements FSC
     return blockManager.numCorruptReplicas(blk);
   }
 
-  /**
-   * Return a range of corrupt replica block ids. Up to numExpectedBlocks 
-   * blocks starting at the next block after startingBlockId are returned
-   * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId 
-   * is null, up to numExpectedBlocks blocks are returned from the beginning.
-   * If startingBlockId cannot be found, null is returned.
-   *
-   * @param numExpectedBlocks Number of block ids to return.
-   *  0 <= numExpectedBlocks <= 100
-   * @param startingBlockId Block id from which to start. If null, start at
-   *  beginning.
-   * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
-   *
-   */
-  long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
-                                   Long startingBlockId) {  
-    return blockManager.getCorruptReplicaBlockIds(numExpectedBlocks,
-                                                  startingBlockId);
-  }
-  
   static class CorruptFileBlockInfo {
     String path;
     Block block;
@@ -4355,28 +4023,6 @@ public class FSNamesystem implements FSC
   }
   
   /**
-   * @return list of datanodes where decommissioning is in progress
-   */
-  public ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
-    readLock();
-    try {
-      ArrayList<DatanodeDescriptor> decommissioningNodes = 
-        new ArrayList<DatanodeDescriptor>();
-      final List<DatanodeDescriptor> results = getBlockManager(
-          ).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.LIVE);
-      for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
-        DatanodeDescriptor node = it.next();
-        if (node.isDecommissionInProgress()) {
-          decommissioningNodes.add(node);
-        }
-      }
-      return decommissioningNodes;
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
    * Create delegation token secret manager
    */
   private DelegationTokenSecretManager createDelegationTokenSecretManager(
@@ -4406,7 +4052,7 @@ public class FSNamesystem implements FSC
    * @return Token<DelegationTokenIdentifier>
    * @throws IOException
    */
-  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+  Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
     Token<DelegationTokenIdentifier> token;
     writeLock();
@@ -4686,13 +4332,9 @@ public class FSNamesystem implements FSC
   public String getLiveNodes() {
     final Map<String, Map<String,Object>> info = 
       new HashMap<String, Map<String,Object>>();
-    final ArrayList<DatanodeDescriptor> liveNodeList = 
-      new ArrayList<DatanodeDescriptor>();
-    final ArrayList<DatanodeDescriptor> deadNodeList =
-      new ArrayList<DatanodeDescriptor>();
-    DFSNodesStatus(liveNodeList, deadNodeList);
-    removeDecomNodeFromList(liveNodeList);
-    for (DatanodeDescriptor node : liveNodeList) {
+    final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+    blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
+    for (DatanodeDescriptor node : live) {
       final Map<String, Object> innerinfo = new HashMap<String, Object>();
       innerinfo.put("lastContact", getLastContact(node));
       innerinfo.put("usedSpace", getDfsUsed(node));
@@ -4710,14 +4352,9 @@ public class FSNamesystem implements FSC
   public String getDeadNodes() {
     final Map<String, Map<String, Object>> info = 
       new HashMap<String, Map<String, Object>>();
-    final ArrayList<DatanodeDescriptor> liveNodeList =
-    new ArrayList<DatanodeDescriptor>();
-    final ArrayList<DatanodeDescriptor> deadNodeList =
-    new ArrayList<DatanodeDescriptor>();
-    // we need to call DFSNodeStatus to filter out the dead data nodes
-    DFSNodesStatus(liveNodeList, deadNodeList);
-    removeDecomNodeFromList(deadNodeList);
-    for (DatanodeDescriptor node : deadNodeList) {
+    final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+    blockManager.getDatanodeManager().fetchDatanodes(null, dead, true);
+    for (DatanodeDescriptor node : dead) {
       final Map<String, Object> innerinfo = new HashMap<String, Object>();
       innerinfo.put("lastContact", getLastContact(node));
       innerinfo.put("decommissioned", node.isDecommissioned());
@@ -4734,8 +4371,8 @@ public class FSNamesystem implements FSC
   public String getDecomNodes() {
     final Map<String, Map<String, Object>> info = 
       new HashMap<String, Map<String, Object>>();
-    final ArrayList<DatanodeDescriptor> decomNodeList = 
-      this.getDecommissioningNodes();
+    final List<DatanodeDescriptor> decomNodeList = blockManager.getDatanodeManager(
+        ).getDecommissioningNodes();
     for (DatanodeDescriptor node : decomNodeList) {
       final Map<String, Object> innerinfo = new HashMap<String, Object>();
       innerinfo.put("underReplicatedBlocks", node.decommissioningStatus
@@ -4771,18 +4408,4 @@ public class FSNamesystem implements FSC
   public BlockManager getBlockManager() {
     return blockManager;
   }
-  
-  void removeDecomNodeFromList(List<DatanodeDescriptor> nodeList) {
-    getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
-  }
-
-  /**
-   * Tell all datanodes to use a new, non-persistent bandwidth value for
-   * dfs.datanode.balance.bandwidthPerSec.
-   * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
-   * @throws IOException
-   */
-  public void setBalancerBandwidth(long bandwidth) throws IOException {
-    getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
-  }
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java Mon Aug  8 10:06:45 2011
@@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletRes
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -59,13 +60,12 @@ public class FsckServlet extends DfsServ
           NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
           
           final FSNamesystem namesystem = nn.getNamesystem();
+          final BlockManager bm = namesystem.getBlockManager();
           final int totalDatanodes = 
-            namesystem.getNumberOfDatanodes(DatanodeReportType.LIVE); 
-          final short minReplication = namesystem.getMinReplication();
-
+              namesystem.getNumberOfDatanodes(DatanodeReportType.LIVE); 
           new NamenodeFsck(conf, nn,
-              NamenodeJspHelper.getNetworkTopology(nn), pmap, out,
-              totalDatanodes, minReplication, remoteAddress).fsck();
+              bm.getDatanodeManager().getNetworkTopology(), pmap, out,
+              totalDatanodes, bm.minReplication, remoteAddress).fsck();
           
           return null;
         }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1154899&r1=1154898&r2=1154899&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Mon Aug  8 10:06:45 2011
@@ -855,7 +855,7 @@ public class NameNode implements Namenod
       DatanodeInfo[] nodes = blocks[i].getLocations();
       for (int j = 0; j < nodes.length; j++) {
         DatanodeInfo dn = nodes[j];
-        namesystem.markBlockAsCorrupt(blk, dn);
+        namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn);
       }
     }
   }
@@ -1055,7 +1055,7 @@ public class NameNode implements Namenod
   @Override
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
   throws IOException {
-    return namesystem.getEditLogManifest(sinceTxId);
+    return namesystem.getEditLog().getEditLogManifest(sinceTxId);
   }
     
   @Override // ClientProtocol
@@ -1096,8 +1096,9 @@ public class NameNode implements Namenod
    * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
    * @throws IOException
    */
+  @Override // ClientProtocol
   public void setBalancerBandwidth(long bandwidth) throws IOException {
-    namesystem.setBalancerBandwidth(bandwidth);
+    namesystem.getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
   }
   
   @Override // ClientProtocol
@@ -1195,7 +1196,7 @@ public class NameNode implements Namenod
            + " blocks");
     }
 
-    namesystem.processReport(nodeReg, poolId, blist);
+    namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
     if (getFSImage().isUpgradeFinalized())
       return new DatanodeCommand.Finalize(poolId);
     return null;
@@ -1210,7 +1211,8 @@ public class NameNode implements Namenod
           +"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
     }
     for (int i = 0; i < blocks.length; i++) {
-      namesystem.blockReceived(nodeReg, poolId, blocks[i], delHints[i]);
+      namesystem.getBlockManager().blockReceived(
+          nodeReg, poolId, blocks[i], delHints[i]);
     }
   }
 



Mime
View raw message