hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1158072 [4/7] - in /hadoop/common/branches/HDFS-1623/hdfs: ./ ivy/ src/c++/libhdfs/ src/contrib/ src/contrib/fuse-dfs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/ser...
Date Tue, 16 Aug 2011 00:37:25 GMT
Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Tue Aug 16 00:37:15 2011
@@ -37,9 +37,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 
@@ -54,17 +54,13 @@ class FSImageTransactionalStorageInspect
   private boolean needToSave = false;
   private boolean isUpgradeFinalized = true;
   
-  List<FoundFSImage> foundImages = new ArrayList<FoundFSImage>();
-  List<FoundEditLog> foundEditLogs = new ArrayList<FoundEditLog>();
+  List<FSImageFile> foundImages = new ArrayList<FSImageFile>();
+  List<EditLogFile> foundEditLogs = new ArrayList<EditLogFile>();
   SortedMap<Long, LogGroup> logGroups = new TreeMap<Long, LogGroup>();
   long maxSeenTxId = 0;
   
   private static final Pattern IMAGE_REGEX = Pattern.compile(
     NameNodeFile.IMAGE.getName() + "_(\\d+)");
-  private static final Pattern EDITS_REGEX = Pattern.compile(
-    NameNodeFile.EDITS.getName() + "_(\\d+)-(\\d+)");
-  private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
-    NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
 
   @Override
   public void inspectDirectory(StorageDirectory sd) throws IOException {
@@ -95,7 +91,7 @@ class FSImageTransactionalStorageInspect
         if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
           try {
             long txid = Long.valueOf(imageMatch.group(1));
-            foundImages.add(new FoundFSImage(sd, f, txid));
+            foundImages.add(new FSImageFile(sd, f, txid));
           } catch (NumberFormatException nfe) {
             LOG.error("Image file " + f + " has improperly formatted " +
                       "transaction ID");
@@ -117,9 +113,10 @@ class FSImageTransactionalStorageInspect
       LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
     }
     
-    List<FoundEditLog> editLogs = matchEditLogs(filesInStorage);
+    List<EditLogFile> editLogs 
+      = FileJournalManager.matchEditLogs(filesInStorage);
     if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-      for (FoundEditLog log : editLogs) {
+      for (EditLogFile log : editLogs) {
         addEditLog(log);
       }
     } else if (!editLogs.isEmpty()){
@@ -133,47 +130,12 @@ class FSImageTransactionalStorageInspect
     isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
   }
 
-  static List<FoundEditLog> matchEditLogs(File[] filesInStorage) {
-    List<FoundEditLog> ret = Lists.newArrayList();
-    for (File f : filesInStorage) {
-      String name = f.getName();
-      // Check for edits
-      Matcher editsMatch = EDITS_REGEX.matcher(name);
-      if (editsMatch.matches()) {
-        try {
-          long startTxId = Long.valueOf(editsMatch.group(1));
-          long endTxId = Long.valueOf(editsMatch.group(2));
-          ret.add(new FoundEditLog(f, startTxId, endTxId));
-        } catch (NumberFormatException nfe) {
-          LOG.error("Edits file " + f + " has improperly formatted " +
-                    "transaction ID");
-          // skip
-        }          
-      }
-      
-      // Check for in-progress edits
-      Matcher inProgressEditsMatch = EDITS_INPROGRESS_REGEX.matcher(name);
-      if (inProgressEditsMatch.matches()) {
-        try {
-          long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
-          ret.add(
-            new FoundEditLog(f, startTxId, FoundEditLog.UNKNOWN_END));
-        } catch (NumberFormatException nfe) {
-          LOG.error("In-progress edits file " + f + " has improperly " +
-                    "formatted transaction ID");
-          // skip
-        }          
-      }
-    }
-    return ret;
-  }
-
-  private void addEditLog(FoundEditLog foundEditLog) {
+  private void addEditLog(EditLogFile foundEditLog) {
     foundEditLogs.add(foundEditLog);
-    LogGroup group = logGroups.get(foundEditLog.startTxId);
+    LogGroup group = logGroups.get(foundEditLog.getFirstTxId());
     if (group == null) {
-      group = new LogGroup(foundEditLog.startTxId);
-      logGroups.put(foundEditLog.startTxId, group);
+      group = new LogGroup(foundEditLog.getFirstTxId());
+      logGroups.put(foundEditLog.getFirstTxId(), group);
     }
     group.add(foundEditLog);
   }
@@ -191,9 +153,9 @@ class FSImageTransactionalStorageInspect
    * 
    * Returns null if no images were found.
    */
-  FoundFSImage getLatestImage() {
-    FoundFSImage ret = null;
-    for (FoundFSImage img : foundImages) {
+  FSImageFile getLatestImage() {
+    FSImageFile ret = null;
+    for (FSImageFile img : foundImages) {
       if (ret == null || img.txId > ret.txId) {
         ret = img;
       }
@@ -201,11 +163,11 @@ class FSImageTransactionalStorageInspect
     return ret;
   }
   
-  public List<FoundFSImage> getFoundImages() {
+  public List<FSImageFile> getFoundImages() {
     return ImmutableList.copyOf(foundImages);
   }
   
-  public List<FoundEditLog> getFoundEditLogs() {
+  public List<EditLogFile> getEditLogFiles() {
     return ImmutableList.copyOf(foundEditLogs);
   }
 
@@ -215,7 +177,7 @@ class FSImageTransactionalStorageInspect
       throw new FileNotFoundException("No valid image files found");
     }
 
-    FoundFSImage recoveryImage = getLatestImage();
+    FSImageFile recoveryImage = getLatestImage();
     LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
 
     return new TransactionalLoadPlan(recoveryImage,
@@ -233,7 +195,7 @@ class FSImageTransactionalStorageInspect
   LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
     long expectedTxId = sinceTxId + 1;
     
-    List<FoundEditLog> recoveryLogs = new ArrayList<FoundEditLog>();
+    List<EditLogFile> recoveryLogs = new ArrayList<EditLogFile>();
     
     SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
     if (logGroups.size() > tailGroups.size()) {
@@ -306,22 +268,6 @@ class FSImageTransactionalStorageInspect
     return needToSave;
   }
   
-  
-  RemoteEditLogManifest getEditLogManifest(long sinceTxId) {
-    List<RemoteEditLog> logs = Lists.newArrayList();
-    for (LogGroup g : logGroups.values()) {
-      if (!g.hasFinalized) continue;
-
-      FoundEditLog fel = g.getBestNonCorruptLog();
-      if (fel.getLastTxId() < sinceTxId) continue;
-      
-      logs.add(new RemoteEditLog(fel.getStartTxId(),
-          fel.getLastTxId()));
-    }
-    
-    return new RemoteEditLogManifest(logs);
-  }
-
   /**
    * A group of logs that all start at the same txid.
    * 
@@ -330,7 +276,7 @@ class FSImageTransactionalStorageInspect
    */
   static class LogGroup {
     long startTxId;
-    List<FoundEditLog> logs = new ArrayList<FoundEditLog>();;
+    List<EditLogFile> logs = new ArrayList<EditLogFile>();;
     private Set<Long> endTxIds = new TreeSet<Long>();
     private boolean hasInProgress = false;
     private boolean hasFinalized = false;
@@ -339,15 +285,15 @@ class FSImageTransactionalStorageInspect
       this.startTxId = startTxId;
     }
     
-    FoundEditLog getBestNonCorruptLog() {
+    EditLogFile getBestNonCorruptLog() {
       // First look for non-corrupt finalized logs
-      for (FoundEditLog log : logs) {
+      for (EditLogFile log : logs) {
         if (!log.isCorrupt() && !log.isInProgress()) {
           return log;
         }
       }
       // Then look for non-corrupt in-progress logs
-      for (FoundEditLog log : logs) {
+      for (EditLogFile log : logs) {
         if (!log.isCorrupt()) {
           return log;
         }
@@ -364,7 +310,7 @@ class FSImageTransactionalStorageInspect
      * @return true if we can determine the last txid in this log group.
      */
     boolean hasKnownLastTxId() {
-      for (FoundEditLog log : logs) {
+      for (EditLogFile log : logs) {
         if (!log.isInProgress()) {
           return true;
         }
@@ -378,24 +324,24 @@ class FSImageTransactionalStorageInspect
      *                               {@see #hasKnownLastTxId()}
      */
     long getLastTxId() {
-      for (FoundEditLog log : logs) {
+      for (EditLogFile log : logs) {
         if (!log.isInProgress()) {
-          return log.lastTxId;
+          return log.getLastTxId();
         }
       }
       throw new IllegalStateException("LogGroup only has in-progress logs");
     }
 
     
-    void add(FoundEditLog log) {
-      assert log.getStartTxId() == startTxId;
+    void add(EditLogFile log) {
+      assert log.getFirstTxId() == startTxId;
       logs.add(log);
       
       if (log.isInProgress()) {
         hasInProgress = true;
       } else {
         hasFinalized = true;
-        endTxIds.add(log.lastTxId);
+        endTxIds.add(log.getLastTxId());
       }
     }
     
@@ -422,7 +368,7 @@ class FSImageTransactionalStorageInspect
      * The in-progress logs in this case should be considered corrupt.
      */
     private void planMixedLogRecovery() throws IOException {
-      for (FoundEditLog log : logs) {
+      for (EditLogFile log : logs) {
         if (log.isInProgress()) {
           LOG.warn("Log at " + log.getFile() + " is in progress, but " +
                    "other logs starting at the same txid " + startTxId +
@@ -446,7 +392,7 @@ class FSImageTransactionalStorageInspect
                "crash)");
       if (logs.size() == 1) {
         // Only one log, it's our only choice!
-        FoundEditLog log = logs.get(0);
+        EditLogFile log = logs.get(0);
         if (log.validateLog().numTransactions == 0) {
           // If it has no transactions, we should consider it corrupt just
           // to be conservative.
@@ -459,7 +405,7 @@ class FSImageTransactionalStorageInspect
       }
 
       long maxValidTxnCount = Long.MIN_VALUE;
-      for (FoundEditLog log : logs) {
+      for (EditLogFile log : logs) {
         long validTxnCount = log.validateLog().numTransactions;
         LOG.warn("  Log " + log.getFile() +
             " valid txns=" + validTxnCount +
@@ -467,7 +413,7 @@ class FSImageTransactionalStorageInspect
         maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount);
       }        
 
-      for (FoundEditLog log : logs) {
+      for (EditLogFile log : logs) {
         long txns = log.validateLog().numTransactions;
         if (txns < maxValidTxnCount) {
           LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
@@ -499,7 +445,7 @@ class FSImageTransactionalStorageInspect
     }
 
     void recover() throws IOException {
-      for (FoundEditLog log : logs) {
+      for (EditLogFile log : logs) {
         if (log.isCorrupt()) {
           log.moveAsideCorruptFile();
         } else if (log.isInProgress()) {
@@ -508,131 +454,12 @@ class FSImageTransactionalStorageInspect
       }
     }    
   }
-
-  /**
-   * Record of an image that has been located and had its filename parsed.
-   */
-  static class FoundFSImage {
-    final StorageDirectory sd;    
-    final long txId;
-    private final File file;
-    
-    FoundFSImage(StorageDirectory sd, File file, long txId) {
-      assert txId >= 0 : "Invalid txid on " + file +": " + txId;
-      
-      this.sd = sd;
-      this.txId = txId;
-      this.file = file;
-    } 
-    
-    File getFile() {
-      return file;
-    }
-
-    public long getTxId() {
-      return txId;
-    }
-    
-    @Override
-    public String toString() {
-      return file.toString();
-    }
-  }
   
-  /**
-   * Record of an edit log that has been located and had its filename parsed.
-   */
-  static class FoundEditLog {
-    File file;
-    final long startTxId;
-    long lastTxId;
-    
-    private EditLogValidation cachedValidation = null;
-    private boolean isCorrupt = false;
-    
-    static final long UNKNOWN_END = -1;
-    
-    FoundEditLog(File file,
-        long startTxId, long endTxId) {
-      assert endTxId == UNKNOWN_END || endTxId >= startTxId;
-      assert startTxId > 0;
-      assert file != null;
-      
-      this.startTxId = startTxId;
-      this.lastTxId = endTxId;
-      this.file = file;
-    }
-    
-    public void finalizeLog() throws IOException {
-      long numTransactions = validateLog().numTransactions;
-      long lastTxId = startTxId + numTransactions - 1;
-      File dst = new File(file.getParentFile(),
-          NNStorage.getFinalizedEditsFileName(startTxId, lastTxId));
-      LOG.info("Finalizing edits log " + file + " by renaming to "
-          + dst.getName());
-      if (!file.renameTo(dst)) {
-        throw new IOException("Couldn't finalize log " +
-            file + " to " + dst);
-      }
-      this.lastTxId = lastTxId;
-      file = dst;
-    }
-
-    long getStartTxId() {
-      return startTxId;
-    }
-    
-    long getLastTxId() {
-      return lastTxId;
-    }
-
-    EditLogValidation validateLog() throws IOException {
-      if (cachedValidation == null) {
-        cachedValidation = FSEditLogLoader.validateEditLog(file);
-      }
-      return cachedValidation;
-    }
-
-    boolean isInProgress() {
-      return (lastTxId == UNKNOWN_END);
-    }
-
-    File getFile() {
-      return file;
-    }
-    
-    void markCorrupt() {
-      isCorrupt = true;
-    }
-    
-    boolean isCorrupt() {
-      return isCorrupt;
-    }
-
-    void moveAsideCorruptFile() throws IOException {
-      assert isCorrupt;
-    
-      File src = file;
-      File dst = new File(src.getParent(), src.getName() + ".corrupt");
-      boolean success = src.renameTo(dst);
-      if (!success) {
-        throw new IOException(
-          "Couldn't rename corrupt log " + src + " to " + dst);
-      }
-      file = dst;
-    }
-    
-    @Override
-    public String toString() {
-      return file.toString();
-    }
-  }
-
   static class TransactionalLoadPlan extends LoadPlan {
-    final FoundFSImage image;
+    final FSImageFile image;
     final LogLoadPlan logPlan;
     
-    public TransactionalLoadPlan(FoundFSImage image,
+    public TransactionalLoadPlan(FSImageFile image,
         LogLoadPlan logPlan) {
       super();
       this.image = image;
@@ -662,10 +489,10 @@ class FSImageTransactionalStorageInspect
   }
   
   static class LogLoadPlan {
-    final List<FoundEditLog> editLogs;
+    final List<EditLogFile> editLogs;
     final List<LogGroup> logGroupsToRecover;
     
-    LogLoadPlan(List<FoundEditLog> editLogs,
+    LogLoadPlan(List<EditLogFile> editLogs,
         List<LogGroup> logGroupsToRecover) {
       this.editLogs = editLogs;
       this.logGroupsToRecover = logGroupsToRecover;
@@ -679,7 +506,7 @@ class FSImageTransactionalStorageInspect
 
     public List<File> getEditsFiles() {
       List<File> ret = new ArrayList<File>();
-      for (FoundEditLog log : editLogs) {
+      for (EditLogFile log : editLogs) {
         ret.add(log.getFile());
       }
       return ret;

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Aug 16 00:37:15 2011
@@ -42,7 +42,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -69,35 +68,36 @@ import org.apache.hadoop.fs.UnresolvedLi
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -107,15 +107,13 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Server;
@@ -151,8 +149,8 @@ import org.mortbay.util.ajax.JSON;
  ***************************************************/
 @InterfaceAudience.Private
 @Metrics(context="dfs")
-public class FSNamesystem implements FSConstants, FSNamesystemMBean,
-    FSClusterStats, NameNodeMXBean {
+public class FSNamesystem implements RwLock, FSClusterStats,
+    FSNamesystemMBean, NameNodeMXBean {
   static final Log LOG = LogFactory.getLog(FSNamesystem.class);
 
   private static final ThreadLocal<StringBuilder> auditBuffer =
@@ -207,9 +205,6 @@ public class FSNamesystem implements FSC
   private PermissionStatus defaultPermission;
   // FSNamesystemMetrics counter variables
   @Metric private MutableCounterInt expiredHeartbeats;
-  private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
-  private long blockPoolUsed = 0L;
-  private int totalLoad = 0;
   
   // Scan interval is not configurable.
   private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
@@ -219,27 +214,16 @@ public class FSNamesystem implements FSC
   //
   // Stores the correct file name hierarchy
   //
-  public FSDirectory dir;
+  FSDirectory dir;
   private BlockManager blockManager;
-  
-  // Block pool ID used by this namenode
-  String blockPoolId;
+  private DatanodeStatistics datanodeStatistics;
 
-  /**
-   * Stores a subset of datanodeMap, containing nodes that are considered alive.
-   * The HeartbeatMonitor periodically checks for out-dated entries,
-   * and removes them from the list.
-   */
-  public ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
+  // Block pool ID used by this namenode
+  private String blockPoolId;
 
-  public LeaseManager leaseManager = new LeaseManager(this); 
+  LeaseManager leaseManager = new LeaseManager(this); 
 
-  //
-  // Threaded object that checks to see if we have been
-  // getting heartbeats from all clients. 
-  //
-  Daemon hbthread = null;   // HeartbeatMonitor thread
-  public Daemon lmthread = null;   // LeaseMonitor thread
+  Daemon lmthread = null;   // LeaseMonitor thread
   Daemon smmthread = null;  // SafeModeMonitor thread
   
   Daemon nnrmthread = null; // NamenodeResourceMonitor thread
@@ -248,9 +232,6 @@ public class FSNamesystem implements FSC
   private volatile boolean fsRunning = true;
   long systemStart = 0;
 
-  // heartbeatRecheckInterval is how often namenode checks for expired datanodes
-  private long heartbeatRecheckInterval;
-
   //resourceRecheckInterval is how often namenode checks for the disk space availability
   private long resourceRecheckInterval;
 
@@ -296,13 +277,14 @@ public class FSNamesystem implements FSC
    */
   private void initialize(Configuration conf, FSImage fsImage)
       throws IOException {
-    resourceRecheckInterval =
-        conf.getLong(DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
-        DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
+    resourceRecheckInterval = conf.getLong(
+        DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
+        DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
     nnResourceChecker = new NameNodeResourceChecker(conf);
     checkAvailableResources();
     this.systemStart = now();
     this.blockManager = new BlockManager(this, conf);
+    this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
     this.fsLock = new ReentrantReadWriteLock(true); // fair locking
     setConfigurationParameters(conf);
     dtSecretManager = createDelegationTokenSecretManager(conf);
@@ -331,26 +313,28 @@ public class FSNamesystem implements FSC
    * Activate FSNamesystem daemons.
    */
   void activate(Configuration conf) throws IOException {
-    setBlockTotal();
-    blockManager.activate(conf);
-    this.hbthread = new Daemon(new HeartbeatMonitor());
-    this.lmthread = new Daemon(leaseManager.new Monitor());
-    
-    hbthread.start();
-    lmthread.start();
-
-    this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
-    nnrmthread.start();
+    writeLock();
+    try {
+      setBlockTotal();
+      blockManager.activate(conf);
 
+      this.lmthread = new Daemon(leaseManager.new Monitor());
+      lmthread.start();
+      this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
+      nnrmthread.start();
+    } finally {
+      writeUnlock();
+    }
+    
     registerMXBean();
     DefaultMetricsSystem.instance().register(this);
   }
 
   public static Collection<URI> getNamespaceDirs(Configuration conf) {
-    return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
+    return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
   }
 
-  public static Collection<URI> getStorageDirs(Configuration conf,
+  private static Collection<URI> getStorageDirs(Configuration conf,
                                                 String propertyName) {
     Collection<String> dirNames = conf.getTrimmedStringCollection(propertyName);
     StartupOption startOpt = NameNode.getStartupOption(conf);
@@ -381,35 +365,35 @@ public class FSNamesystem implements FSC
   }
 
   public static Collection<URI> getNamespaceEditsDirs(Configuration conf) {
-    return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
+    return getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY);
   }
 
-  // utility methods to acquire and release read lock and write lock
+  @Override
   public void readLock() {
     this.fsLock.readLock().lock();
   }
-
+  @Override
   public void readUnlock() {
     this.fsLock.readLock().unlock();
   }
-
+  @Override
   public void writeLock() {
     this.fsLock.writeLock().lock();
   }
-
+  @Override
   public void writeUnlock() {
     this.fsLock.writeLock().unlock();
   }
-
+  @Override
   public boolean hasWriteLock() {
     return this.fsLock.isWriteLockedByCurrentThread();
   }
-
-  boolean hasReadLock() {
+  @Override
+  public boolean hasReadLock() {
     return this.fsLock.getReadHoldCount() > 0;
   }
-
-  boolean hasReadOrWriteLock() {
+  @Override
+  public boolean hasReadOrWriteLock() {
     return hasReadLock() || hasWriteLock();
   }
 
@@ -453,33 +437,30 @@ public class FSNamesystem implements FSC
     
     LOG.info("fsOwner=" + fsOwner);
 
-    this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, 
-                               DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
-    this.isPermissionEnabled = conf.getBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
-                                               DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
+    this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, 
+                               DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
+    this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
+                                               DFS_PERMISSIONS_ENABLED_DEFAULT);
     LOG.info("supergroup=" + supergroup);
     LOG.info("isPermissionEnabled=" + isPermissionEnabled);
-    short filePermission = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY,
-                                              DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
+    short filePermission = (short)conf.getInt(DFS_NAMENODE_UPGRADE_PERMISSION_KEY,
+                                              DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
     this.defaultPermission = PermissionStatus.createImmutable(
         fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
-
-    this.heartbeatRecheckInterval = conf.getInt(
-        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
-        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
     
     this.serverDefaults = new FsServerDefaults(
-        conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
-        conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BYTES_PER_CHECKSUM),
-        conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DEFAULT_WRITE_PACKET_SIZE),
-        (short) conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DEFAULT_REPLICATION_FACTOR),
-        conf.getInt("io.file.buffer.size", DEFAULT_FILE_BUFFER_SIZE));
-    this.maxFsObjects = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY, 
-                                     DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
-
-    this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
-    this.supportAppends = conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
-                                      DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
+        conf.getLong(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
+        conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
+        conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
+        (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
+        conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
+    
+    this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
+                                     DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
+
+    this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
+    this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY,
+        DFS_SUPPORT_APPEND_DEFAULT);
 
     this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
   }
@@ -497,7 +478,8 @@ public class FSNamesystem implements FSC
     try {
       return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
           getClusterId(), getBlockPoolId(),
-          dir.fsImage.getStorage().getCTime(), getDistributedUpgradeVersion());
+          dir.fsImage.getStorage().getCTime(),
+          upgradeManager.getUpgradeVersion());
     } finally {
       readUnlock();
     }
@@ -508,11 +490,10 @@ public class FSNamesystem implements FSC
    * Causes heartbeat and lease daemons to stop; waits briefly for
    * them to finish, but a short timeout returns control back to caller.
    */
-  public void close() {
+  void close() {
     fsRunning = false;
     try {
       if (blockManager != null) blockManager.close();
-      if (hbthread != null) hbthread.interrupt();
       if (smmthread != null) smmthread.interrupt();
       if (dtSecretManager != null) dtSecretManager.stopThreads();
       if (nnrmthread != null) nnrmthread.interrupt();
@@ -554,14 +535,12 @@ public class FSNamesystem implements FSC
   
       long totalInodes = this.dir.totalInodes();
       long totalBlocks = this.getBlocksTotal();
-  
-      ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
-      ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-      this.DFSNodesStatus(live, dead);
-      
-      String str = totalInodes + " files and directories, " + totalBlocks
-          + " blocks = " + (totalInodes + totalBlocks) + " total";
-      out.println(str);
+      out.println(totalInodes + " files and directories, " + totalBlocks
+          + " blocks = " + (totalInodes + totalBlocks) + " total");
+
+      final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+      final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+      blockManager.getDatanodeManager().fetchDatanodes(live, dead, false);
       out.println("Live Datanodes: "+live.size());
       out.println("Dead Datanodes: "+dead.size());
       blockManager.metaSave(out);
@@ -591,30 +570,6 @@ public class FSNamesystem implements FSC
 
   /////////////////////////////////////////////////////////
   //
-  // These methods are called by secondary namenodes
-  //
-  /////////////////////////////////////////////////////////
-  /**
-   * return a list of blocks & their locations on <code>datanode</code> whose
-   * total size is <code>size</code>
-   * 
-   * @param datanode on which blocks are located
-   * @param size total size of blocks
-   */
-  BlocksWithLocations getBlocks(DatanodeID datanode, long size)
-      throws IOException {
-    readLock();
-    try {
-      checkSuperuserPrivilege();
-      return blockManager.getBlocksWithLocations(datanode, size);  
-    } finally {
-      readUnlock();
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////
-  //
   // These methods are called by HadoopFS clients
   //
   /////////////////////////////////////////////////////////
@@ -622,7 +577,7 @@ public class FSNamesystem implements FSC
    * Set permissions for an existing file.
    * @throws IOException
    */
-  public void setPermission(String src, FsPermission permission)
+  void setPermission(String src, FsPermission permission)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
@@ -651,7 +606,7 @@ public class FSNamesystem implements FSC
    * Set owner for an existing file.
    * @throws IOException
    */
-  public void setOwner(String src, String username, String group)
+  void setOwner(String src, String username, String group)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
@@ -770,7 +725,9 @@ public class FSNamesystem implements FSC
           }
           dir.setTimes(src, inode, -1, now, false);
         }
-        return getBlockLocationsInternal(inode, offset, length, needBlockToken);
+        return blockManager.createLocatedBlocks(inode.getBlocks(),
+            inode.computeFileSize(false), inode.isUnderConstruction(),
+            offset, length, needBlockToken);
       } finally {
         if (attempt == 0) {
           readUnlock();
@@ -781,50 +738,6 @@ public class FSNamesystem implements FSC
     }
     return null; // can never reach here
   }
-  
-  LocatedBlocks getBlockLocationsInternal(INodeFile inode,
-      long offset, long length, boolean needBlockToken)
-  throws IOException {
-    assert hasReadOrWriteLock();
-    final BlockInfo[] blocks = inode.getBlocks();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
-    }
-    if (blocks == null) {
-      return null;
-    }
-
-    if (blocks.length == 0) {
-      return new LocatedBlocks(0, inode.isUnderConstruction(),
-          Collections.<LocatedBlock>emptyList(), null, false);
-    } else {
-      final long n = inode.computeFileSize(false);
-      final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
-          blocks, offset, length, Integer.MAX_VALUE);
-      final BlockInfo last = inode.getLastBlock();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("last = " + last);
-      }
-      
-      LocatedBlock lastBlock = last.isComplete() ? blockManager
-          .getBlockLocation(last, n - last.getNumBytes()) : blockManager
-          .getBlockLocation(last, n);
-          
-      if (blockManager.isBlockTokenEnabled() && needBlockToken) {
-        blockManager.setBlockTokens(locatedblocks);
-        blockManager.setBlockToken(lastBlock);
-      }
-      return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
-          lastBlock, last.isComplete());
-    }
-  }
-
-  /** Create a LocatedBlock. */
-  public LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
-      final long offset, final boolean corrupt) throws IOException {
-    return new LocatedBlock(getExtendedBlock(b), locations, offset, corrupt);
-  }
-  
 
   /**
    * Moves all the blocks from srcs and appends them to trg
@@ -834,7 +747,7 @@ public class FSNamesystem implements FSC
    * @param srcs
    * @throws IOException
    */
-  public void concat(String target, String [] srcs) 
+  void concat(String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
     if(FSNamesystem.LOG.isDebugEnabled()) {
       FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
@@ -882,7 +795,7 @@ public class FSNamesystem implements FSC
   }
 
   /** See {@link #concat(String, String[])} */
-  public void concatInternal(String target, String [] srcs) 
+  private void concatInternal(String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
 
@@ -986,7 +899,7 @@ public class FSNamesystem implements FSC
    * The access time is precise upto an hour. The transaction, if needed, is
    * written to the edits log but is not flushed.
    */
-  public void setTimes(String src, long mtime, long atime) 
+  void setTimes(String src, long mtime, long atime) 
     throws IOException, UnresolvedLinkException {
     if (!isAccessTimeSupported() && atime != -1) {
       throw new IOException("Access time for hdfs is not configured. " +
@@ -1018,7 +931,7 @@ public class FSNamesystem implements FSC
   /**
    * Create a symbolic link.
    */
-  public void createSymlink(String target, String link,
+  void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
     HdfsFileStatus resultingStat = null;
@@ -1086,60 +999,37 @@ public class FSNamesystem implements FSC
    * @return true if successful; 
    *         false if file does not exist or is a directory
    */
-  public boolean setReplication(String src, short replication) 
-    throws IOException, UnresolvedLinkException {
-    boolean status = false;
+  boolean setReplication(final String src, final short replication
+      ) throws IOException {
+    blockManager.verifyReplication(src, replication, null);
+
+    final boolean isFile;
     writeLock();
     try {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set replication for " + src, safeMode);
       }
-      status = setReplicationInternal(src, replication);
+      if (isPermissionEnabled) {
+        checkPathAccess(src, FsAction.WRITE);
+      }
+
+      final short[] oldReplication = new short[1];
+      final Block[] blocks = dir.setReplication(src, replication, oldReplication);
+      isFile = blocks != null;
+      if (isFile) {
+        blockManager.setReplication(oldReplication[0], replication, src, blocks);
+      }
     } finally {
       writeUnlock();
     }
+
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isFile && auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     "setReplication", src, null, null);
     }
-    return status;
-  }
-
-  private boolean setReplicationInternal(String src,
-      short replication) throws AccessControlException, QuotaExceededException,
-      SafeModeException, UnresolvedLinkException, IOException {
-    assert hasWriteLock();
-    blockManager.verifyReplication(src, replication, null);
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
-    }
-
-    int[] oldReplication = new int[1];
-    Block[] fileBlocks;
-    fileBlocks = dir.setReplication(src, replication, oldReplication);
-    if (fileBlocks == null)  // file not found or is a directory
-      return false;
-    int oldRepl = oldReplication[0];
-    if (oldRepl == replication) // the same replication
-      return true;
-
-    // update needReplication priority queues
-    for(int idx = 0; idx < fileBlocks.length; idx++)
-      blockManager.updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
-      
-    if (oldRepl > replication) {  
-      // old replication > the new one; need to remove copies
-      LOG.info("Reducing replication for file " + src 
-               + ". New replication is " + replication);
-      for(int idx = 0; idx < fileBlocks.length; idx++)
-        blockManager.processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
-    } else { // replication factor is increased
-      LOG.info("Increasing replication for file " + src 
-          + ". New replication is " + replication);
-    }
-    return true;
+    return isFile;
   }
     
   long getPreferredBlockSize(String filename) 
@@ -1313,9 +1203,8 @@ public class FSNamesystem implements FSC
         LocatedBlock lb = 
           blockManager.convertLastBlockToUnderConstruction(cons);
 
-        if (lb != null && blockManager.isBlockTokenEnabled()) {
-          lb.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(lb.getBlock(), 
-              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
+        if (lb != null) {
+          blockManager.setBlockToken(lb, AccessMode.WRITE);
         }
         return lb;
       } else {
@@ -1482,7 +1371,7 @@ public class FSNamesystem implements FSC
     try {
       lb = startFileInternal(src, null, holder, clientMachine, 
                         EnumSet.of(CreateFlag.APPEND), 
-                        false, (short)blockManager.maxReplication, (long)0);
+                        false, blockManager.maxReplication, (long)0);
     } finally {
       writeUnlock();
     }
@@ -1522,7 +1411,7 @@ public class FSNamesystem implements FSC
    * are replicated.  Will return an empty 2-elt array if we want the
    * client to "try again later".
    */
-  public LocatedBlock getAdditionalBlock(String src,
+  LocatedBlock getAdditionalBlock(String src,
                                          String clientName,
                                          ExtendedBlock previous,
                                          HashMap<Node, Node> excludedNodes
@@ -1603,10 +1492,7 @@ public class FSNamesystem implements FSC
 
     // Create next block
     LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
-    if (blockManager.isBlockTokenEnabled()) {
-      b.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(b.getBlock(), 
-          EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
-    }
+    blockManager.setBlockToken(b, BlockTokenSecretManager.AccessMode.WRITE);
     return b;
   }
 
@@ -1652,17 +1538,14 @@ public class FSNamesystem implements FSC
         ).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
         excludes, preferredblocksize);
     final LocatedBlock lb = new LocatedBlock(blk, targets);
-    if (blockManager.isBlockTokenEnabled()) {
-      lb.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(lb.getBlock(), 
-          EnumSet.of(BlockTokenSecretManager.AccessMode.COPY)));
-    }
+    blockManager.setBlockToken(lb, AccessMode.COPY);
     return lb;
   }
 
   /**
    * The client would like to let go of the given block
    */
-  public boolean abandonBlock(ExtendedBlock b, String src, String holder)
+  boolean abandonBlock(ExtendedBlock b, String src, String holder)
       throws LeaseExpiredException, FileNotFoundException,
       UnresolvedLinkException, IOException {
     writeLock();
@@ -1731,7 +1614,7 @@ public class FSNamesystem implements FSC
    *         (e.g if not all blocks have reached minimum replication yet)
    * @throws IOException on error (eg lease mismatch, file not open, file deleted)
    */
-  public boolean completeFile(String src, String holder, ExtendedBlock last) 
+  boolean completeFile(String src, String holder, ExtendedBlock last) 
     throws SafeModeException, UnresolvedLinkException, IOException {
     checkBlock(last);
     boolean success = false;
@@ -1847,23 +1730,6 @@ public class FSNamesystem implements FSC
     }
   }
 
-
-  /**
-   * Mark the block belonging to datanode as corrupt
-   * @param blk Block to be marked as corrupt
-   * @param dn Datanode which holds the corrupt replica
-   */
-  public void markBlockAsCorrupt(ExtendedBlock blk, DatanodeInfo dn)
-    throws IOException {
-    writeLock();
-    try {
-      blockManager.findAndMarkBlockAsCorrupt(blk.getLocalBlock(), dn);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-
   ////////////////////////////////////////////////////////////////
   // Here's how to handle block-copy failure during client write:
   // -- As usual, the client's write should result in a streaming
@@ -1988,7 +1854,7 @@ public class FSNamesystem implements FSC
    * @see ClientProtocol#delete(String, boolean) for detailed descriptoin and 
    * description of exceptions
    */
-    public boolean delete(String src, boolean recursive)
+    boolean delete(String src, boolean recursive)
         throws AccessControlException, SafeModeException,
                UnresolvedLinkException, IOException {
       if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2118,7 +1984,7 @@ public class FSNamesystem implements FSC
   /**
    * Create all the necessary directories
    */
-  public boolean mkdirs(String src, PermissionStatus permissions,
+  boolean mkdirs(String src, PermissionStatus permissions,
       boolean createParent) throws IOException, UnresolvedLinkException {
     boolean status = false;
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2374,7 +2240,7 @@ public class FSNamesystem implements FSC
     return false;
   }
 
-  Lease reassignLease(Lease lease, String src, String newHolder,
+  private Lease reassignLease(Lease lease, String src, String newHolder,
       INodeFileUnderConstruction pendingFile) throws IOException {
     assert hasWriteLock();
     if(newHolder == null)
@@ -2390,6 +2256,22 @@ public class FSNamesystem implements FSC
     return leaseManager.reassignLease(lease, src, newHolder);
   }
 
+  /** Update disk space consumed. */
+  public void updateDiskSpaceConsumed(final INodeFileUnderConstruction fileINode,
+      final Block commitBlock) throws IOException {
+    assert hasWriteLock();
+
+    // Adjust disk space consumption if required
+    final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();    
+    if (diff > 0) {
+      try {
+        String path = leaseManager.findPath(fileINode);
+        dir.updateSpaceConsumed(path, 0, -diff * fileINode.getReplication());
+      } catch (IOException e) {
+        LOG.warn("Unexpected exception while updating disk space.", e);
+      }
+    }
+  }
 
   private void finalizeINodeFileUnderConstruction(String src, 
       INodeFileUnderConstruction pendingFile) 
@@ -2536,7 +2418,7 @@ public class FSNamesystem implements FSC
    * @throws UnresolvedLinkException if symbolic link is encountered
    * @throws IOException if other I/O error occurred
    */
-  public DirectoryListing getListing(String src, byte[] startAfter,
+  DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) 
     throws AccessControlException, UnresolvedLinkException, IOException {
     DirectoryListing dl;
@@ -2589,8 +2471,7 @@ public class FSNamesystem implements FSC
    * 
    * @see org.apache.hadoop.hdfs.server.datanode.DataNode
    */
-  public void registerDatanode(DatanodeRegistration nodeReg)
-      throws IOException {
+  void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
     writeLock();
     try {
       getBlockManager().getDatanodeManager().registerDatanode(nodeReg);
@@ -2606,7 +2487,7 @@ public class FSNamesystem implements FSC
    * @see #registerDatanode(DatanodeRegistration)
    * @return registration ID
    */
-  public String getRegistrationID() {
+  String getRegistrationID() {
     return Storage.getRegistrationID(dir.fsImage.getStorage());
   }
 
@@ -2621,13 +2502,14 @@ public class FSNamesystem implements FSC
    * @return an array of datanode commands 
    * @throws IOException
    */
-  public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
+  DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       int xceiverCount, int xmitsInProgress, int failedVolumes) 
         throws IOException {
     readLock();
     try {
-      final int maxTransfer = blockManager.maxReplicationStreams - xmitsInProgress;
+      final int maxTransfer = blockManager.getMaxReplicationStreams()
+          - xmitsInProgress;
       DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
           nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
           xceiverCount, maxTransfer, failedVolumes);
@@ -2636,7 +2518,7 @@ public class FSNamesystem implements FSC
       }
 
       //check distributed upgrade
-      DatanodeCommand cmd = getDistributedUpgradeCommand();
+      DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
       if (cmd != null) {
         return new DatanodeCommand[] {cmd};
       }
@@ -2646,45 +2528,6 @@ public class FSNamesystem implements FSC
     }
   }
 
-  public void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
-      final DatanodeDescriptor nodeinfo) {
-    // check access key update
-    if (blockManager.isBlockTokenEnabled() && nodeinfo.needKeyUpdate) {
-      cmds.add(new KeyUpdateCommand(blockManager.getBlockTokenSecretManager().exportKeys()));
-      nodeinfo.needKeyUpdate = false;
-    }
-  }
-
-  public void updateStats(DatanodeDescriptor node, boolean isAdded) {
-    //
-    // The statistics are protected by the heartbeat lock
-    // For decommissioning/decommissioned nodes, only used capacity
-    // is counted.
-    //
-    assert(Thread.holdsLock(heartbeats));
-    if (isAdded) {
-      capacityUsed += node.getDfsUsed();
-      blockPoolUsed += node.getBlockPoolUsed();
-      totalLoad += node.getXceiverCount();
-      if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
-        capacityTotal += node.getCapacity();
-        capacityRemaining += node.getRemaining();
-      } else {
-        capacityTotal += node.getDfsUsed();
-      }
-    } else {
-      capacityUsed -= node.getDfsUsed();
-      blockPoolUsed -= node.getBlockPoolUsed();
-      totalLoad -= node.getXceiverCount();
-      if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
-        capacityTotal -= node.getCapacity();
-        capacityRemaining -= node.getRemaining();
-      } else {
-        capacityTotal -= node.getDfsUsed();
-      }
-    }
-  }
-
   /**
    * Returns whether or not there were available resources at the last check of
    * resources.
@@ -2735,314 +2578,14 @@ public class FSNamesystem implements FSC
       }
     }
   }
-
-
-  /**
-   * Periodically calls heartbeatCheck() and updateBlockKey()
-   */
-  class HeartbeatMonitor implements Runnable {
-    private long lastHeartbeatCheck;
-    private long lastBlockKeyUpdate;
-    /**
-     */
-    public void run() {
-      while (fsRunning) {
-        try {
-          long now = now();
-          if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
-            heartbeatCheck();
-            lastHeartbeatCheck = now;
-          }
-          if (blockManager.isBlockTokenEnabled()
-              && (lastBlockKeyUpdate + blockManager.getBlockKeyUpdateInterval() < now)) {
-            blockManager.updateBlockKey();
-            lastBlockKeyUpdate = now;
-          }
-        } catch (Exception e) {
-          FSNamesystem.LOG.error("Exception while checking heartbeat", e);
-        }
-        try {
-          Thread.sleep(5000);  // 5 seconds
-        } catch (InterruptedException ie) {
-        }
-      }
-    }
-  }
-
- 
-  public void setNodeReplicationLimit(int limit) {
-    blockManager.maxReplicationStreams = limit;
-  }
-
-  /**
-   * Remove a datanode descriptor.
-   * @param nodeID datanode ID.
-   * @throws UnregisteredNodeException 
-   */
-  public void removeDatanode(final DatanodeID nodeID
-      ) throws UnregisteredNodeException {
-    writeLock();
-    try {
-      DatanodeDescriptor nodeInfo = getBlockManager().getDatanodeManager(
-          ).getDatanode(nodeID);
-      if (nodeInfo != null) {
-        removeDatanode(nodeInfo);
-      } else {
-        NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
-                                     + nodeID.getName() + " does not exist");
-      }
-    } finally {
-      writeUnlock();
-    }
-  }
   
-  /**
-   * Remove a datanode descriptor.
-   * @param nodeInfo datanode descriptor.
-   */
-  public void removeDatanode(DatanodeDescriptor nodeInfo) {
-    assert hasWriteLock();
-    synchronized (heartbeats) {
-      if (nodeInfo.isAlive) {
-        updateStats(nodeInfo, false);
-        heartbeats.remove(nodeInfo);
-        nodeInfo.isAlive = false;
-      }
-    }
-
-    blockManager.removeDatanode(nodeInfo);
-
-    checkSafeMode();
-  }
-
   FSImage getFSImage() {
     return dir.fsImage;
   }
 
   FSEditLog getEditLog() {
     return getFSImage().getEditLog();
-  }
-
-  /**
-   * Check if there are any expired heartbeats, and if so,
-   * whether any blocks have to be re-replicated.
-   * While removing dead datanodes, make sure that only one datanode is marked
-   * dead at a time within the synchronized section. Otherwise, a cascading
-   * effect causes more datanodes to be declared dead.
-   */
-  void heartbeatCheck() {
-    final DatanodeManager datanodeManager = getBlockManager().getDatanodeManager();
-    // It's OK to check safe mode w/o taking the lock here, we re-check
-    // for safe mode after taking the lock before removing a datanode.
-    if (isInSafeMode()) {
-      return;
-    }
-    boolean allAlive = false;
-    while (!allAlive) {
-      boolean foundDead = false;
-      DatanodeID nodeID = null;
-
-      // locate the first dead node.
-      synchronized(heartbeats) {
-        for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
-             it.hasNext();) {
-          DatanodeDescriptor nodeInfo = it.next();
-          if (datanodeManager.isDatanodeDead(nodeInfo)) {
-            expiredHeartbeats.incr();
-            foundDead = true;
-            nodeID = nodeInfo;
-            break;
-          }
-        }
-      }
-
-      // acquire the fsnamesystem lock, and then remove the dead node.
-      if (foundDead) {
-        writeLock();
-        if (isInSafeMode()) {
-          return;
-        }
-        try {
-          datanodeManager.removeDeadDatanode(nodeID);
-        } finally {
-          writeUnlock();
-        }
-      }
-      allAlive = !foundDead;
-    }
-  }
-    
-  /**
-   * The given node is reporting all its blocks.  Use this info to 
-   * update the (machine-->blocklist) and (block-->machinelist) tables.
-   */
-  public void processReport(DatanodeID nodeID, String poolId,
-      BlockListAsLongs newReport) throws IOException {
-    long startTime, endTime;
-
-    writeLock();
-    startTime = now(); //after acquiring write lock
-    try {
-      final DatanodeDescriptor node = blockManager.getDatanodeManager(
-          ).getDatanode(nodeID);
-      if (node == null || !node.isAlive) {
-        throw new IOException("ProcessReport from dead or unregistered node: "
-                              + nodeID.getName());
-      }
-      // To minimize startup time, we discard any second (or later) block reports
-      // that we receive while still in startup phase.
-      if (isInStartupSafeMode() && node.numBlocks() > 0) {
-        NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
-            + "discarded non-initial block report from " + nodeID.getName()
-            + " because namenode still in startup phase");
-        return;
-      }
-  
-      blockManager.processReport(node, newReport);
-    } finally {
-      endTime = now();
-      writeUnlock();
-    }
-
-    // Log the block report processing stats from Namenode perspective
-    NameNode.getNameNodeMetrics().addBlockReport((int) (endTime - startTime));
-    NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: from "
-        + nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
-        + ", processing time: " + (endTime - startTime) + " msecs");
-  }
-
-  /**
-   * We want "replication" replicates for the block, but we now have too many.  
-   * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
-   *
-   * srcNodes.size() - dstNodes.size() == replication
-   *
-   * We pick node that make sure that replicas are spread across racks and
-   * also try hard to pick one with least free space.
-   * The algorithm is first to pick a node with least free space from nodes
-   * that are on a rack holding more than one replicas of the block.
-   * So removing such a replica won't remove a rack. 
-   * If no such a node is available,
-   * then pick a node with least free space
-   */
-  public void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
-                              Block b, short replication,
-                              DatanodeDescriptor addedNode,
-                              DatanodeDescriptor delNodeHint,
-                              BlockPlacementPolicy replicator) {
-    assert hasWriteLock();
-    // first form a rack to datanodes map and
-    INodeFile inode = blockManager.getINode(b);
-    HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
-      new HashMap<String, ArrayList<DatanodeDescriptor>>();
-    for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
-         iter.hasNext();) {
-      DatanodeDescriptor node = iter.next();
-      String rackName = node.getNetworkLocation();
-      ArrayList<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
-      if(datanodeList==null) {
-        datanodeList = new ArrayList<DatanodeDescriptor>();
-      }
-      datanodeList.add(node);
-      rackMap.put(rackName, datanodeList);
-    }
-    
-    // split nodes into two sets
-    // priSet contains nodes on rack with more than one replica
-    // remains contains the remaining nodes
-    ArrayList<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
-    ArrayList<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
-    for( Iterator<Entry<String, ArrayList<DatanodeDescriptor>>> iter = 
-      rackMap.entrySet().iterator(); iter.hasNext(); ) {
-      Entry<String, ArrayList<DatanodeDescriptor>> rackEntry = iter.next();
-      ArrayList<DatanodeDescriptor> datanodeList = rackEntry.getValue(); 
-      if( datanodeList.size() == 1 ) {
-        remains.add(datanodeList.get(0));
-      } else {
-        priSet.addAll(datanodeList);
-      }
-    }
-    
-    // pick one node to delete that favors the delete hint
-    // otherwise pick one with least space from priSet if it is not empty
-    // otherwise one node with least space from remains
-    boolean firstOne = true;
-    while (nonExcess.size() - replication > 0) {
-      DatanodeInfo cur = null;
-
-      // check if we can del delNodeHint
-      if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
-            (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
-          cur = delNodeHint;
-      } else { // regular excessive replica removal
-        cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);
-      }
-
-      firstOne = false;
-      // adjust rackmap, priSet, and remains
-      String rack = cur.getNetworkLocation();
-      ArrayList<DatanodeDescriptor> datanodes = rackMap.get(rack);
-      datanodes.remove(cur);
-      if(datanodes.isEmpty()) {
-        rackMap.remove(rack);
-      }
-      if( priSet.remove(cur) ) {
-        if (datanodes.size() == 1) {
-          priSet.remove(datanodes.get(0));
-          remains.add(datanodes.get(0));
-        }
-      } else {
-        remains.remove(cur);
-      }
-
-      nonExcess.remove(cur);
-      blockManager.addToExcessReplicate(cur, b);
-
-      //
-      // The 'excessblocks' tracks blocks until we get confirmation
-      // that the datanode has deleted them; the only way we remove them
-      // is when we get a "removeBlock" message.  
-      //
-      // The 'invalidate' list is used to inform the datanode the block 
-      // should be deleted.  Items are removed from the invalidate list
-      // upon giving instructions to the namenode.
-      //
-      blockManager.addToInvalidates(b, cur);
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.chooseExcessReplicates: "
-                +"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
-    }
-  }
-
-
-  /**
-   * The given node is reporting that it received a certain block.
-   */
-  public void blockReceived(DatanodeID nodeID,  
-                                         String poolId,
-                                         Block block,
-                                         String delHint
-                                         ) throws IOException {
-    writeLock();
-    try {
-      final DatanodeDescriptor node = blockManager.getDatanodeManager(
-          ).getDatanode(nodeID);
-      if (node == null || !node.isAlive) {
-        NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
-            + " is received from dead or unregistered node " + nodeID.getName());
-        throw new IOException(
-            "Got blockReceived message from unregistered or dead node " + block);
-      }
-          
-      if (NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
-                                      +block+" is received from " + nodeID.getName());
-      }
-  
-      blockManager.addBlock(node, block, delHint);
-    } finally {
-      writeUnlock();
-    }
-  }
+  }    
 
   private void checkBlock(ExtendedBlock block) throws IOException {
     if (block != null && !this.blockPoolId.equals(block.getBlockPoolId())) {
@@ -3057,15 +2600,18 @@ public class FSNamesystem implements FSC
     return blockManager.getMissingBlocksCount();
   }
   
+  /** Increment expired heartbeat counter. */
+  public void incrExpiredHeartbeats() {
+    expiredHeartbeats.incr();
+  }
+
+  /** @see ClientProtocol#getStats() */
   long[] getStats() {
-    synchronized(heartbeats) {
-      return new long[] {this.capacityTotal, this.capacityUsed, 
-                         this.capacityRemaining,
-                         getUnderReplicatedBlocks(),
-                         getCorruptReplicaBlocks(),
-                         getMissingBlocksCount(),
-                         getBlockPoolUsedSpace()};
-    }
+    final long[] stats = datanodeStatistics.getStats();
+    stats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = getUnderReplicatedBlocks();
+    stats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = getCorruptReplicaBlocks();
+    stats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = getMissingBlocksCount();
+    return stats;
   }
 
   /**
@@ -3073,9 +2619,7 @@ public class FSNamesystem implements FSC
    */
   @Override // FSNamesystemMBean
   public long getCapacityTotal() {
-    synchronized(heartbeats) {
-      return capacityTotal;
-    }
+    return datanodeStatistics.getCapacityTotal();
   }
 
   @Metric
@@ -3088,9 +2632,7 @@ public class FSNamesystem implements FSC
    */
   @Override // FSNamesystemMBean
   public long getCapacityUsed() {
-    synchronized(heartbeats) {
-      return capacityUsed;
-    }
+    return datanodeStatistics.getCapacityUsed();
   }
 
   @Metric
@@ -3098,32 +2640,9 @@ public class FSNamesystem implements FSC
     return DFSUtil.roundBytesToGB(getCapacityUsed());
   }
 
-  /**
-   * Total used space by data nodes as percentage of total capacity
-   */
-  public float getCapacityUsedPercent() {
-    synchronized(heartbeats){
-      return DFSUtil.getPercentUsed(capacityUsed, capacityTotal);
-    }
-  }
-  /**
-   * Total used space by data nodes for non DFS purposes such
-   * as storing temporary files on the local file system
-   */
-  public long getCapacityUsedNonDFS() {
-    long nonDFSUsed = 0;
-    synchronized(heartbeats){
-      nonDFSUsed = capacityTotal - capacityRemaining - capacityUsed;
-    }
-    return nonDFSUsed < 0 ? 0 : nonDFSUsed;
-  }
-  /**
-   * Total non-used raw bytes.
-   */
+  @Override
   public long getCapacityRemaining() {
-    synchronized(heartbeats) {
-      return capacityRemaining;
-    }
+    return datanodeStatistics.getCapacityRemaining();
   }
 
   @Metric
@@ -3132,22 +2651,12 @@ public class FSNamesystem implements FSC
   }
 
   /**
-   * Total remaining space by data nodes as percentage of total capacity
-   */
-  public float getCapacityRemainingPercent() {
-    synchronized(heartbeats){
-      return DFSUtil.getPercentRemaining(capacityRemaining, capacityTotal);
-    }
-  }
-  /**
    * Total number of connections.
    */
   @Override // FSNamesystemMBean
   @Metric
   public int getTotalLoad() {
-    synchronized (heartbeats) {
-      return this.totalLoad;
-    }
+    return datanodeStatistics.getXceiverCount();
   }
 
   int getNumberOfDatanodes(DatanodeReportType type) {
@@ -3225,63 +2734,10 @@ public class FSNamesystem implements FSC
     }
   }
 
-  /**
-   */
-  public void DFSNodesStatus(ArrayList<DatanodeDescriptor> live, 
-                                          ArrayList<DatanodeDescriptor> dead) {
-    readLock();
-    try {
-      getBlockManager().getDatanodeManager().fetchDatanodess(live, dead);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  public Date getStartTime() {
+  Date getStartTime() {
     return new Date(systemStart); 
   }
     
-  short getMaxReplication()     { return (short)blockManager.maxReplication; }
-  short getMinReplication()     { return (short)blockManager.minReplication; }
-  short getDefaultReplication() { return (short)blockManager.defaultReplication; }
-
-  /**
-   * Clamp the specified replication between the minimum and maximum
-   * replication levels for this namesystem.
-   */
-  short adjustReplication(short replication) {
-    short minReplication = getMinReplication();
-    if (replication < minReplication) {
-      replication = minReplication;
-    }
-    short maxReplication = getMaxReplication();
-    if (replication > maxReplication) {
-      replication = maxReplication;
-    }
-    return replication;
-  }
-    
-  
-  /**
-   * Rereads the config to get hosts and exclude list file names.
-   * Rereads the files to update the hosts and exclude lists.  It
-   * checks if any of the hosts have changed states:
-   * 1. Added to hosts  --> no further work needed here.
-   * 2. Removed from hosts --> mark AdminState as decommissioned. 
-   * 3. Added to exclude --> start decommission.
-   * 4. Removed from exclude --> stop decommission.
-   */
-  public void refreshNodes(Configuration conf) throws IOException {
-    checkSuperuserPrivilege();
-    getBlockManager().getDatanodeManager().refreshHostsReader(conf);
-    writeLock();
-    try {
-      getBlockManager().getDatanodeManager().refreshDatanodes();
-    } finally {
-      writeUnlock();
-    }
-  }
-    
   void finalizeUpgrade() throws IOException {
     checkSuperuserPrivilege();
     getFSImage().finalizeUpgrade();
@@ -3351,16 +2807,17 @@ public class FSNamesystem implements FSC
      * @param conf configuration
      */
     SafeModeInfo(Configuration conf) {
-      this.threshold = conf.getFloat(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
+      this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
+          DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
       this.datanodeThreshold = conf.getInt(
-        DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
-        DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
-      this.extension = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
-      this.safeReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 
-                                         DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
+        DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
+        DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
+      this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
+      this.safeReplication = conf.getInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 
+                                         DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
       // default to safe mode threshold (i.e., don't populate queues before leaving safe mode)
       this.replQueueThreshold = 
-        conf.getFloat(DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
+        conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
                       (float) threshold);
       this.blockTotal = 0; 
       this.blockSafe = 0;
@@ -3429,7 +2886,7 @@ public class FSNamesystem implements FSC
         // verify whether a distributed upgrade needs to be started
         boolean needUpgrade = false;
         try {
-          needUpgrade = startDistributedUpgradeIfNeeded();
+          needUpgrade = upgradeManager.startUpgrade();
         } catch(IOException e) {
           FSNamesystem.LOG.error("IOException in startDistributedUpgradeIfNeeded", e);
         }
@@ -3458,7 +2915,7 @@ public class FSNamesystem implements FSC
           + nt.getNumOfRacks() + " racks and "
           + nt.getNumOfLeaves() + " datanodes");
       NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
-                                   +blockManager.neededReplications.size()+" blocks");
+          + blockManager.numOfUnderReplicatedBlocks() + " blocks");
     }
 
     /**
@@ -3622,10 +3079,10 @@ public class FSNamesystem implements FSC
         leaveMsg = "Safe mode will be turned off automatically";
       }
       if(isManual()) {
-        if(getDistributedUpgradeState())
+        if(upgradeManager.getUpgradeState())
           return leaveMsg + " upon completion of " + 
             "the distributed upgrade: upgrade progress = " + 
-            getDistributedUpgradeStatus() + "%";
+            upgradeManager.getUpgradeStatus() + "%";
         leaveMsg = "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off";
       }
 
@@ -3757,8 +3214,9 @@ public class FSNamesystem implements FSC
     }
     return isInSafeMode();
   }
-  
-  private void checkSafeMode() {
+
+  /** Check and trigger safe mode. */
+  public void checkSafeMode() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode != null) {
@@ -3826,7 +3284,7 @@ public class FSNamesystem implements FSC
   /**
    * Set the total number of blocks in the system. 
    */
-  void setBlockTotal() {
+  private void setBlockTotal() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
@@ -3847,7 +3305,7 @@ public class FSNamesystem implements FSC
    * Get the total number of COMPLETE blocks in the system.
    * For safe mode only complete blocks are counted.
    */
-  long getCompleteBlocksTotal() {
+  private long getCompleteBlocksTotal() {
     // Calculate number of blocks under construction
     long numUCBlocks = 0;
     readLock();
@@ -3918,7 +3376,7 @@ public class FSNamesystem implements FSC
         NameNode.stateChangeLog.info("STATE* Safe mode is already OFF."); 
         return;
       }
-      if(getDistributedUpgradeState())
+      if(upgradeManager.getUpgradeState())
         throw new SafeModeException("Distributed upgrade is in progress",
                                     safeMode);
       safeMode.leave(checkForUpgrades);
@@ -3955,10 +3413,6 @@ public class FSNamesystem implements FSC
       writeUnlock();
     }
   }
-  
-  public RemoteEditLogManifest getEditLogManifest(long sinceTxId) throws IOException {
-    return getEditLog().getEditLogManifest(sinceTxId);
-  }
 
   NamenodeCommand startCheckpoint(
                                 NamenodeRegistration bnReg, // backup node
@@ -4011,26 +3465,6 @@ public class FSNamesystem implements FSC
     return upgradeManager.processUpgradeCommand(comm);
   }
 
-  int getDistributedUpgradeVersion() {
-    return upgradeManager.getUpgradeVersion();
-  }
-
-  UpgradeCommand getDistributedUpgradeCommand() throws IOException {
-    return upgradeManager.getBroadcastCommand();
-  }
-
-  boolean getDistributedUpgradeState() {
-    return upgradeManager.getUpgradeState();
-  }
-
-  short getDistributedUpgradeStatus() {
-    return upgradeManager.getUpgradeStatus();
-  }
-
-  boolean startDistributedUpgradeIfNeeded() throws IOException {
-    return upgradeManager.startUpgrade();
-  }
-
   PermissionStatus createFsOwnerPermissions(FsPermission permission) {
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
   }
@@ -4060,7 +3494,8 @@ public class FSNamesystem implements FSC
     return checkPermission(path, false, null, null, null, null);
   }
 
-  private void checkSuperuserPrivilege() throws AccessControlException {
+  /** Check if the user has superuser privilege. */
+  public void checkSuperuserPrivilege() throws AccessControlException {
     if (isPermissionEnabled) {
       FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
     }
@@ -4131,11 +3566,6 @@ public class FSNamesystem implements FSC
     return blockManager.getUnderReplicatedBlocksCount();
   }
 
-  /** Return number of under-replicated but not missing blocks */
-  public long getUnderReplicatedNotMissingBlocks() {
-    return blockManager.getUnderReplicatedNotMissingBlocks();
-  }
-
   /** Returns number of blocks with corrupt replicas */
   @Metric({"CorruptBlocks", "Number of blocks with corrupt replicas"})
   public long getCorruptReplicaBlocks() {
@@ -4173,7 +3603,7 @@ public class FSNamesystem implements FSC
    * Register the FSNamesystem MBean using the name
    *        "hadoop:service=NameNode,name=FSNamesystemState"
    */
-  void registerMBean() {
+  private void registerMBean() {
     // We can only implement one MXBean interface, so we keep the old one.
     try {
       StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class);
@@ -4188,7 +3618,7 @@ public class FSNamesystem implements FSC
   /**
    * shutdown FSNamesystem
    */
-  public void shutdown() {
+  void shutdown() {
     if (mbeanName != null)
       MBeans.unregister(mbeanName);
   }
@@ -4207,14 +3637,14 @@ public class FSNamesystem implements FSC
   /**
    * Sets the generation stamp for this filesystem
    */
-  public void setGenerationStamp(long stamp) {
+  void setGenerationStamp(long stamp) {
     generationStamp.setStamp(stamp);
   }
 
   /**
    * Gets the generation stamp for this filesystem
    */
-  public long getGenerationStamp() {
+  long getGenerationStamp() {
     return generationStamp.getStamp();
   }
 
@@ -4289,10 +3719,7 @@ public class FSNamesystem implements FSC
       // get a new generation stamp and an access token
       block.setGenerationStamp(nextGenerationStamp());
       locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
-      if (blockManager.isBlockTokenEnabled()) {
-        locatedBlock.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(
-          block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
-      }
+      blockManager.setBlockToken(locatedBlock, AccessMode.WRITE);
     } finally {
       writeUnlock();
     }
@@ -4337,7 +3764,7 @@ public class FSNamesystem implements FSC
   }
 
   /** @see updatePipeline(String, ExtendedBlock, ExtendedBlock, DatanodeID[]) */
-  void updatePipelineInternal(String clientName, ExtendedBlock oldBlock, 
+  private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock, 
       ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException {
     assert hasWriteLock();
@@ -4493,26 +3920,6 @@ public class FSNamesystem implements FSC
     return blockManager.numCorruptReplicas(blk);
   }
 
-  /**
-   * Return a range of corrupt replica block ids. Up to numExpectedBlocks 
-   * blocks starting at the next block after startingBlockId are returned
-   * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId 
-   * is null, up to numExpectedBlocks blocks are returned from the beginning.
-   * If startingBlockId cannot be found, null is returned.
-   *
-   * @param numExpectedBlocks Number of block ids to return.
-   *  0 <= numExpectedBlocks <= 100
-   * @param startingBlockId Block id from which to start. If null, start at
-   *  beginning.
-   * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
-   *
-   */
-  long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
-                                   Long startingBlockId) {  
-    return blockManager.getCorruptReplicaBlockIds(numExpectedBlocks,
-                                                  startingBlockId);
-  }
-  
   static class CorruptFileBlockInfo {
     String path;
     Block block;
@@ -4552,7 +3959,8 @@ public class FSNamesystem implements FSC
       if (startBlockAfter != null) {
         startBlockId = Block.filename2id(startBlockAfter);
       }
-      UnderReplicatedBlocks.BlockIterator blkIterator = blockManager.getCorruptReplicaBlockIterator();
+
+      final Iterator<Block> blkIterator = blockManager.getCorruptReplicaBlockIterator();
       while (blkIterator.hasNext()) {
         Block blk = blkIterator.next();
         INode inode = blockManager.getINode(blk);
@@ -4575,41 +3983,17 @@ public class FSNamesystem implements FSC
   }
   
   /**
-   * @return list of datanodes where decommissioning is in progress
-   */
-  public ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
-    readLock();
-    try {
-      ArrayList<DatanodeDescriptor> decommissioningNodes = 
-        new ArrayList<DatanodeDescriptor>();
-      final List<DatanodeDescriptor> results = getBlockManager(
-          ).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.LIVE);
-      for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
-        DatanodeDescriptor node = it.next();
-        if (node.isDecommissionInProgress()) {
-          decommissioningNodes.add(node);
-        }
-      }
-      return decommissioningNodes;
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
    * Create delegation token secret manager
    */
   private DelegationTokenSecretManager createDelegationTokenSecretManager(
       Configuration conf) {
     return new DelegationTokenSecretManager(conf.getLong(
-        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY,
-        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT),
-        conf.getLong(
-            DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
-            DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT),
-        conf.getLong(
-            DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
-            DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT),
+        DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+        DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT),
+        conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+            DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT),
+        conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+            DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT),
         DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL, this);
   }
 
@@ -4617,7 +4001,7 @@ public class FSNamesystem implements FSC
    * Returns the DelegationTokenSecretManager instance in the namesystem.
    * @return delegation token secret manager object
    */
-  public DelegationTokenSecretManager getDelegationTokenSecretManager() {
+  DelegationTokenSecretManager getDelegationTokenSecretManager() {
     return dtSecretManager;
   }
 
@@ -4626,7 +4010,7 @@ public class FSNamesystem implements FSC
    * @return Token<DelegationTokenIdentifier>
    * @throws IOException
    */
-  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+  Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
     Token<DelegationTokenIdentifier> token;
     writeLock();
@@ -4670,7 +4054,7 @@ public class FSNamesystem implements FSC
    * @throws InvalidToken
    * @throws IOException
    */
-  public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
+  long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
     long expiryTime;
     writeLock();
@@ -4701,7 +4085,7 @@ public class FSNamesystem implements FSC
    * @param token
    * @throws IOException
    */
-  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
+  void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
     writeLock();
     try {
@@ -4854,31 +4238,27 @@ public class FSNamesystem implements FSC
 
   @Override // NameNodeMXBean
   public long getNonDfsUsedSpace() {
-    return getCapacityUsedNonDFS();
+    return datanodeStatistics.getCapacityUsedNonDFS();
   }
 
   @Override // NameNodeMXBean
   public float getPercentUsed() {
-    return getCapacityUsedPercent();
+    return datanodeStatistics.getCapacityUsedPercent();
   }
 
   @Override // NameNodeMXBean
   public long getBlockPoolUsedSpace() {
-    synchronized(heartbeats) {
-      return blockPoolUsed;
-    }
+    return datanodeStatistics.getBlockPoolUsed();
   }
 
   @Override // NameNodeMXBean
   public float getPercentBlockPoolUsed() {
-    synchronized(heartbeats) {
-      return DFSUtil.getPercentUsed(blockPoolUsed, capacityTotal);
-    }
+    return datanodeStatistics.getPercentBlockPoolUsed();
   }
 
   @Override // NameNodeMXBean
   public float getPercentRemaining() {
-    return getCapacityRemainingPercent();
+    return datanodeStatistics.getCapacityRemainingPercent();
   }
 
   @Override // NameNodeMXBean
@@ -4910,13 +4290,9 @@ public class FSNamesystem implements FSC
   public String getLiveNodes() {
     final Map<String, Map<String,Object>> info = 
       new HashMap<String, Map<String,Object>>();
-    final ArrayList<DatanodeDescriptor> liveNodeList = 
-      new ArrayList<DatanodeDescriptor>();
-    final ArrayList<DatanodeDescriptor> deadNodeList =
-      new ArrayList<DatanodeDescriptor>();
-    DFSNodesStatus(liveNodeList, deadNodeList);
-    removeDecomNodeFromList(liveNodeList);
-    for (DatanodeDescriptor node : liveNodeList) {
+    final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+    blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
+    for (DatanodeDescriptor node : live) {
       final Map<String, Object> innerinfo = new HashMap<String, Object>();
       innerinfo.put("lastContact", getLastContact(node));
       innerinfo.put("usedSpace", getDfsUsed(node));
@@ -4934,14 +4310,9 @@ public class FSNamesystem implements FSC
   public String getDeadNodes() {
     final Map<String, Map<String, Object>> info = 
       new HashMap<String, Map<String, Object>>();
-    final ArrayList<DatanodeDescriptor> liveNodeList =
-    new ArrayList<DatanodeDescriptor>();
-    final ArrayList<DatanodeDescriptor> deadNodeList =
-    new ArrayList<DatanodeDescriptor>();
-    // we need to call DFSNodeStatus to filter out the dead data nodes
-    DFSNodesStatus(liveNodeList, deadNodeList);
-    removeDecomNodeFromList(deadNodeList);
-    for (DatanodeDescriptor node : deadNodeList) {
+    final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+    blockManager.getDatanodeManager().fetchDatanodes(null, dead, true);
+    for (DatanodeDescriptor node : dead) {
       final Map<String, Object> innerinfo = new HashMap<String, Object>();
       innerinfo.put("lastContact", getLastContact(node));
       innerinfo.put("decommissioned", node.isDecommissioned());
@@ -4958,8 +4329,8 @@ public class FSNamesystem implements FSC
   public String getDecomNodes() {
     final Map<String, Map<String, Object>> info = 
       new HashMap<String, Map<String, Object>>();
-    final ArrayList<DatanodeDescriptor> decomNodeList = 
-      this.getDecommissioningNodes();
+    final List<DatanodeDescriptor> decomNodeList = blockManager.getDatanodeManager(
+        ).getDecommissioningNodes();
     for (DatanodeDescriptor node : decomNodeList) {
       final Map<String, Object> innerinfo = new HashMap<String, Object>();
       innerinfo.put("underReplicatedBlocks", node.decommissioningStatus
@@ -4995,18 +4366,4 @@ public class FSNamesystem implements FSC
   public BlockManager getBlockManager() {
     return blockManager;
   }
-  
-  void removeDecomNodeFromList(List<DatanodeDescriptor> nodeList) {
-    getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
-  }
-
-  /**
-   * Tell all datanodes to use a new, non-persistent bandwidth value for
-   * dfs.datanode.balance.bandwidthPerSec.
-   * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
-   * @throws IOException
-   */
-  public void setBalancerBandwidth(long bandwidth) throws IOException {
-    getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
-  }
 }



Mime
View raw message