hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1166495 [4/9] - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/bin/ src/main/docs/src/documentation/content/xdocs/ src/main/java/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ ...
Date Thu, 08 Sep 2011 01:39:32 GMT
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java Thu Sep  8 01:39:07 2011
@@ -75,7 +75,7 @@ public class GetDelegationTokenServlet e
           + ":" + NameNode.getAddress(conf).getPort();
 
           Token<DelegationTokenIdentifier> token = 
-            nn.getDelegationToken(new Text(renewerFinal));
+            nn.getRpcServer().getDelegationToken(new Text(renewerFinal));
           if(token == null) {
             throw new Exception("couldn't get the token for " +s);
           }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Thu Sep  8 01:39:07 2011
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 
 /**
  * I-node for file being written.

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Thu Sep  8 01:39:07 2011
@@ -41,6 +41,25 @@ interface JournalManager {
    */
   void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
 
+   /**
+   * Get the input stream starting with fromTxnId from this journal manager
+   * @param fromTxnId the first transaction id we want to read
+   * @return the stream starting with transaction fromTxnId
+   * @throws IOException if a stream cannot be found.
+   */
+  EditLogInputStream getInputStream(long fromTxnId) throws IOException;
+
+  /**
+   * Get the number of transaction contiguously available from fromTxnId.
+   *
+   * @param fromTxnId Transaction id to count from
+   * @return The number of transactions available from fromTxnId
+   * @throws IOException if the journal cannot be read.
+   * @throws CorruptionException if there is a gap in the journal at fromTxnId.
+   */
+  long getNumberOfTransactions(long fromTxnId) 
+      throws IOException, CorruptionException;
+
   /**
    * Set the amount of memory that this stream should use to buffer edits
    */
@@ -59,10 +78,21 @@ interface JournalManager {
     throws IOException;
 
   /**
-   * @return an EditLogInputStream that reads from the same log that
-   * the edit log is currently writing. May return null if this journal
-   * manager does not support this operation.
-   */  
-  EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
-    throws IOException;
+   * Recover segments which have not been finalized.
+   */
+  void recoverUnfinalizedSegments() throws IOException;
+
+  /** 
+   * Indicate that a journal is cannot be used to load a certain range of 
+   * edits.
+   * This exception occurs in the case of a gap in the transactions, or a
+   * corrupt edit file.
+   */
+  public static class CorruptionException extends IOException {
+    static final long serialVersionUID = -4687802717006172702L;
+    
+    public CorruptionException(String reason) {
+      super(reason);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Thu Sep  8 01:39:07 2011
@@ -32,8 +32,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
@@ -65,8 +65,8 @@ public class LeaseManager {
 
   private final FSNamesystem fsnamesystem;
 
-  private long softLimit = FSConstants.LEASE_SOFTLIMIT_PERIOD;
-  private long hardLimit = FSConstants.LEASE_HARDLIMIT_PERIOD;
+  private long softLimit = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
+  private long hardLimit = HdfsConstants.LEASE_HARDLIMIT_PERIOD;
 
   //
   // Used for handling lock-leases
@@ -379,7 +379,7 @@ public class LeaseManager {
 
 
         try {
-          Thread.sleep(HdfsConstants.NAMENODE_LEASE_RECHECK_INTERVAL);
+          Thread.sleep(HdfsServerConstants.NAMENODE_LEASE_RECHECK_INTERVAL);
         } catch(InterruptedException ie) {
           if (LOG.isDebugEnabled()) {
             LOG.debug(name + " is interrupted", ie);
@@ -409,7 +409,7 @@ public class LeaseManager {
       oldest.getPaths().toArray(leasePaths);
       for(String p : leasePaths) {
         try {
-          if(fsnamesystem.internalReleaseLease(oldest, p, HdfsConstants.NAMENODE_LEASE_HOLDER)) {
+          if(fsnamesystem.internalReleaseLease(oldest, p, HdfsServerConstants.NAMENODE_LEASE_HOLDER)) {
             LOG.info("Lease recovery for file " + p +
                           " is complete. File closed.");
             removing.add(p);

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Thu Sep  8 01:39:07 2011
@@ -42,11 +42,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
@@ -126,7 +126,7 @@ public class NNStorage extends Storage i
    * recent fsimage file. This does not include any transactions
    * that have since been written to the edit log.
    */
-  protected long mostRecentCheckpointTxId = FSConstants.INVALID_TXID;
+  protected long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID;
 
   /**
    * list of failed (and thus removed) storages
@@ -501,7 +501,7 @@ public class NNStorage extends Storage i
    * Format all available storage directories.
    */
   public void format(String clusterId) throws IOException {
-    this.layoutVersion = FSConstants.LAYOUT_VERSION;
+    this.layoutVersion = HdfsConstants.LAYOUT_VERSION;
     this.namespaceID = newNamespaceID();
     this.clusterID = clusterId;
     this.blockpoolID = newBlockPoolID();
@@ -574,7 +574,7 @@ public class NNStorage extends Storage i
    * This should only be used during upgrades.
    */
   String getDeprecatedProperty(String prop) {
-    assert getLayoutVersion() > FSConstants.LAYOUT_VERSION :
+    assert getLayoutVersion() > HdfsConstants.LAYOUT_VERSION :
       "getDeprecatedProperty should only be done when loading " +
       "storage from past versions during upgrade.";
     return deprecatedProperties.get(prop);
@@ -764,7 +764,7 @@ public class NNStorage extends Storage i
       if(upgradeManager.getDistributedUpgrades() != null)
         throw new IOException("\n   Distributed upgrade for NameNode version "
                               + upgradeManager.getUpgradeVersion()
-                              + " to current LV " + FSConstants.LAYOUT_VERSION
+                              + " to current LV " + HdfsConstants.LAYOUT_VERSION
                               + " is required.\n   Please restart NameNode"
                               + " with -upgrade option.");
     }
@@ -780,7 +780,7 @@ public class NNStorage extends Storage i
     writeAll();
     LOG.info("\n   Distributed upgrade for NameNode version "
              + upgradeManager.getUpgradeVersion() + " to current LV "
-             + FSConstants.LAYOUT_VERSION + " is initialized.");
+             + HdfsConstants.LAYOUT_VERSION + " is initialized.");
   }
 
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Sep  8 01:39:07 2011
@@ -21,9 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
@@ -34,82 +32,37 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.ha.HealthCheckFailedException;
 import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.PermissionStatus;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
-import static org.apache.hadoop.hdfs.protocol.FSConstants.MAX_PATH_LENGTH;
-import static org.apache.hadoop.hdfs.protocol.FSConstants.MAX_PATH_DEPTH;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
 import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.io.EnumSetWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
@@ -152,7 +105,7 @@ import org.apache.hadoop.util.StringUtil
  * NameNode state, for example partial blocksMap etc.
  **********************************************************/
 @InterfaceAudience.Private
-public class NameNode implements NamenodeProtocols {
+public class NameNode {
   static{
     HdfsConfiguration.init();
   }
@@ -219,12 +172,6 @@ public class NameNode implements Namenod
   }
     
 
-  @Override // VersionedProtocol
-  public ProtocolSignature getProtocolSignature(String protocol,
-      long clientVersion, int clientMethodsHash) throws IOException {
-    return ProtocolSignature.getProtocolSignature(
-        this, protocol, clientVersion, clientMethodsHash);
-  }
 
   public static final int DEFAULT_PORT = 8020;
 
@@ -239,18 +186,6 @@ public class NameNode implements Namenod
   private final boolean haEnabled;
 
   
-  /** RPC server. Package-protected for use in tests. */
-  Server server;
-  /** RPC server for HDFS Services communication.
-      BackupNode, Datanodes and all other services
-      should be connecting to this server if it is
-      configured. Clients should only go to NameNode#server
-  */
-  protected Server serviceRpcServer;
-  /** RPC server address */
-  protected InetSocketAddress rpcAddress = null;
-  /** RPC server for DN address */
-  protected InetSocketAddress serviceRPCAddress = null;
   /** httpServer */
   protected NameNodeHttpServer httpServer;
   private Thread emptier;
@@ -258,11 +193,11 @@ public class NameNode implements Namenod
   protected boolean stopRequested = false;
   /** Registration information of this name-node  */
   protected NamenodeRegistration nodeRegistration;
-  /** Is service level authorization enabled? */
-  private boolean serviceAuthEnabled = false;
   /** Activated plug-ins. */
   private List<ServicePlugin> plugins;
   
+  private NameNodeRpcServer rpcServer;
+  
   /** Format a new filesystem.  Destroys any filesystem that may already
    * exist at this location.  **/
   public static void format(Configuration conf) throws IOException {
@@ -278,6 +213,10 @@ public class NameNode implements Namenod
     return namesystem;
   }
 
+  public NamenodeProtocols getRpcServer() {
+    return rpcServer;
+  }
+  
   static void initMetrics(Configuration conf, NamenodeRole role) {
     metrics = NameNodeMetrics.create(conf, role);
   }
@@ -327,19 +266,19 @@ public class NameNode implements Namenod
    * @param filesystemURI
    * @return address of file system
    */
-  public static InetSocketAddress getAddress(URI filesystemURI) {
+  static InetSocketAddress getAddress(URI filesystemURI) {
     String authority = filesystemURI.getAuthority();
     if (authority == null) {
       throw new IllegalArgumentException(String.format(
           "Invalid URI for NameNode address (check %s): %s has no authority.",
           FileSystem.FS_DEFAULT_NAME_KEY, filesystemURI.toString()));
     }
-    if (!FSConstants.HDFS_URI_SCHEME.equalsIgnoreCase(
+    if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(
         filesystemURI.getScheme())) {
       throw new IllegalArgumentException(String.format(
           "Invalid URI for NameNode address (check %s): %s is not of scheme '%s'.",
           FileSystem.FS_DEFAULT_NAME_KEY, filesystemURI.toString(),
-          FSConstants.HDFS_URI_SCHEME));
+          HdfsConstants.HDFS_URI_SCHEME));
     }
     return getAddress(authority);
   }
@@ -347,7 +286,7 @@ public class NameNode implements Namenod
   public static URI getUri(InetSocketAddress namenode) {
     int port = namenode.getPort();
     String portString = port == DEFAULT_PORT ? "" : (":"+port);
-    return URI.create(FSConstants.HDFS_URI_SCHEME + "://" 
+    return URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" 
         + namenode.getHostName()+portString);
   }
 
@@ -385,11 +324,13 @@ public class NameNode implements Namenod
   /**
    * Modifies the configuration passed to contain the service rpc address setting
    */
-  protected void setRpcServiceServerAddress(Configuration conf) {
+  protected void setRpcServiceServerAddress(Configuration conf,
+      InetSocketAddress serviceRPCAddress) {
     setServiceAddress(conf, getHostPortString(serviceRPCAddress));
   }
 
-  protected void setRpcServerAddress(Configuration conf) {
+  protected void setRpcServerAddress(Configuration conf,
+      InetSocketAddress rpcAddress) {
     FileSystem.setDefaultUri(conf, getUri(rpcAddress));
   }
 
@@ -404,7 +345,7 @@ public class NameNode implements Namenod
   }
 
   protected void loadNamesystem(Configuration conf) throws IOException {
-    this.namesystem = new FSNamesystem(conf);
+    this.namesystem = FSNamesystem.loadFromDisk(conf);
   }
 
   NamenodeRegistration getRegistration() {
@@ -413,7 +354,7 @@ public class NameNode implements Namenod
 
   NamenodeRegistration setRegistration() {
     nodeRegistration = new NamenodeRegistration(
-        getHostPortString(rpcAddress),
+        getHostPortString(rpcServer.getRpcAddress()),
         getHostPortString(getHttpAddress()),
         getFSImage().getStorage(), getRole());
     return nodeRegistration;
@@ -435,45 +376,13 @@ public class NameNode implements Namenod
    */
   protected void initialize(Configuration conf) throws IOException {
     initializeGenericKeys(conf);
-    InetSocketAddress socAddr = getRpcServerAddress(conf);
     UserGroupInformation.setConfiguration(conf);
     loginAsNameNodeUser(conf);
-    int handlerCount = 
-      conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, 
-                  DFS_DATANODE_HANDLER_COUNT_DEFAULT);
 
     NameNode.initMetrics(conf, this.getRole());
     loadNamesystem(conf);
-    // create rpc server
-    InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
-    if (dnSocketAddr != null) {
-      int serviceHandlerCount =
-        conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
-                    DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
-      this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
-          dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,
-          false, conf, namesystem.getDelegationTokenSecretManager());
-      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
-      setRpcServiceServerAddress(conf);
-    }
-    this.server = RPC.getServer(NamenodeProtocols.class, this,
-                                socAddr.getHostName(), socAddr.getPort(),
-                                handlerCount, false, conf, 
-                                namesystem.getDelegationTokenSecretManager());
-
-    // set service-level authorization security policy
-    if (serviceAuthEnabled =
-          conf.getBoolean(
-            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
-      this.server.refreshServiceAcl(conf, new HDFSPolicyProvider());
-      if (this.serviceRpcServer != null) {
-        this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
-      }
-    }
 
-    // The rpc-server port can be ephemeral... ensure we have the correct info
-    this.rpcAddress = this.server.getListenerAddress(); 
-    setRpcServerAddress(conf);
+    rpcServer = createRpcServer(conf);
     
     try {
       validateConfigurationSettings(conf);
@@ -486,6 +395,15 @@ public class NameNode implements Namenod
   }
   
   /**
+   * Create the RPC server implementation. Used as an extension point for the
+   * BackupNode.
+   */
+  protected NameNodeRpcServer createRpcServer(Configuration conf)
+      throws IOException {
+    return new NameNodeRpcServer(conf, this);
+  }
+
+  /**
    * Verifies that the final Configuration Settings look ok for the NameNode to
    * properly start up
    * Things to check for include:
@@ -517,10 +435,7 @@ public class NameNode implements Namenod
     }
     namesystem.activate(conf);
     startHttpServer(conf);
-    server.start();  //start RPC server
-    if (serviceRpcServer != null) {
-      serviceRpcServer.start();      
-    }
+    rpcServer.start();
     startTrashEmptier(conf);
     
     plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
@@ -532,9 +447,10 @@ public class NameNode implements Namenod
         LOG.warn("ServicePlugin " + p + " could not be started", t);
       }
     }
-    LOG.info(getRole() + " up at: " + rpcAddress);
-    if (serviceRPCAddress != null) {
-      LOG.info(getRole() + " service server is up at: " + serviceRPCAddress); 
+    
+    LOG.info(getRole() + " up at: " + rpcServer.getRpcAddress());
+    if (rpcServer.getServiceRpcAddress() != null) {
+      LOG.info(getRole() + " service server is up at: " + rpcServer.getServiceRpcAddress()); 
     }
   }
 
@@ -605,7 +521,7 @@ public class NameNode implements Namenod
    */
   public void join() {
     try {
-      this.server.join();
+      this.rpcServer.join();
     } catch (InterruptedException ie) {
     }
   }
@@ -635,8 +551,7 @@ public class NameNode implements Namenod
     }
     if(namesystem != null) namesystem.close();
     if(emptier != null) emptier.interrupt();
-    if(server != null) server.stop();
-    if(serviceRpcServer != null) serviceRpcServer.stop();
+    if(rpcServer != null) rpcServer.stop();
     if (metrics != null) {
       metrics.shutdown();
     }
@@ -649,440 +564,6 @@ public class NameNode implements Namenod
     return stopRequested;
   }
 
-  /////////////////////////////////////////////////////
-  // NamenodeProtocol
-  /////////////////////////////////////////////////////
-  @Override // NamenodeProtocol
-  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
-  throws IOException {
-    if(size <= 0) {
-      throw new IllegalArgumentException(
-        "Unexpected not positive size: "+size);
-    }
-
-    return namesystem.getBlockManager().getBlocks(datanode, size); 
-  }
-
-  @Override // NamenodeProtocol
-  public ExportedBlockKeys getBlockKeys() throws IOException {
-    return namesystem.getBlockManager().getBlockKeys();
-  }
-
-  @Override // NamenodeProtocol
-  public void errorReport(NamenodeRegistration registration,
-                          int errorCode, 
-                          String msg) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    verifyRequest(registration);
-    LOG.info("Error report from " + registration + ": " + msg);
-    if(errorCode == FATAL)
-      namesystem.releaseBackupNode(registration);
-  }
-
-  @Override // NamenodeProtocol
-  public NamenodeRegistration register(NamenodeRegistration registration)
-  throws IOException {
-    verifyVersion(registration.getVersion());
-    NamenodeRegistration myRegistration = setRegistration();
-    namesystem.registerBackupNode(registration, myRegistration);
-    return myRegistration;
-  }
-
-  @Override // NamenodeProtocol
-  public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
-  throws IOException {
-    verifyRequest(registration);
-    if(!isRole(NamenodeRole.NAMENODE))
-      throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
-    return namesystem.startCheckpoint(registration, setRegistration());
-  }
-
-  @Override // NamenodeProtocol
-  public void endCheckpoint(NamenodeRegistration registration,
-                            CheckpointSignature sig) throws IOException {
-    checkOperation(OperationCategory.CHECKPOINT);
-    namesystem.endCheckpoint(registration, sig);
-  }
-
-  @Override // ClientProtocol
-  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    return namesystem.getDelegationToken(renewer);
-  }
-
-  @Override // ClientProtocol
-  public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
-      throws InvalidToken, IOException {
-    checkOperation(OperationCategory.WRITE);
-    return namesystem.renewDelegationToken(token);
-  }
-
-  @Override // ClientProtocol
-  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    namesystem.cancelDelegationToken(token);
-  }
-  
-  @Override // ClientProtocol
-  public LocatedBlocks getBlockLocations(String src, 
-                                          long offset, 
-                                          long length) 
-      throws IOException {
-    checkOperation(OperationCategory.READ);
-    metrics.incrGetBlockLocations();
-    return namesystem.getBlockLocations(getClientMachine(), 
-                                        src, offset, length);
-  }
-  
-  @Override // ClientProtocol
-  public FsServerDefaults getServerDefaults() throws IOException {
-    return namesystem.getServerDefaults();
-  }
-
-  @Override // ClientProtocol
-  public void create(String src, 
-                     FsPermission masked,
-                     String clientName, 
-                     EnumSetWritable<CreateFlag> flag,
-                     boolean createParent,
-                     short replication,
-                     long blockSize) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    String clientMachine = getClientMachine();
-    if (stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.create: file "
-                         +src+" for "+clientName+" at "+clientMachine);
-    }
-    if (!checkPathLength(src)) {
-      throw new IOException("create: Pathname too long.  Limit "
-          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
-    }
-    namesystem.startFile(src,
-        new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
-            null, masked),
-        clientName, clientMachine, flag.get(), createParent, replication, blockSize);
-    metrics.incrFilesCreated();
-    metrics.incrCreateFileOps();
-  }
-
-  @Override // ClientProtocol
-  public LocatedBlock append(String src, String clientName) 
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    String clientMachine = getClientMachine();
-    if (stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.append: file "
-          +src+" for "+clientName+" at "+clientMachine);
-    }
-    LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
-    metrics.incrFilesAppended();
-    return info;
-  }
-
-  @Override // ClientProtocol
-  public boolean recoverLease(String src, String clientName) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    String clientMachine = getClientMachine();
-    return namesystem.recoverLease(src, clientName, clientMachine);
-  }
-
-  @Override // ClientProtocol
-  public boolean setReplication(String src, short replication) 
-    throws IOException {  
-    checkOperation(OperationCategory.WRITE);
-    return namesystem.setReplication(src, replication);
-  }
-    
-  @Override // ClientProtocol
-  public void setPermission(String src, FsPermission permissions)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    namesystem.setPermission(src, permissions);
-  }
-
-  @Override // ClientProtocol
-  public void setOwner(String src, String username, String groupname)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    namesystem.setOwner(src, username, groupname);
-  }
-
-  @Override // ClientProtocol
-  public LocatedBlock addBlock(String src,
-                               String clientName,
-                               ExtendedBlock previous,
-                               DatanodeInfo[] excludedNodes)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
-          +src+" for "+clientName);
-    }
-    HashMap<Node, Node> excludedNodesSet = null;
-    if (excludedNodes != null) {
-      excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
-      for (Node node:excludedNodes) {
-        excludedNodesSet.put(node, node);
-      }
-    }
-    LocatedBlock locatedBlock = 
-      namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
-    if (locatedBlock != null)
-      metrics.incrAddBlockOps();
-    return locatedBlock;
-  }
-
-  @Override // ClientProtocol
-  public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
-      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
-      final int numAdditionalNodes, final String clientName
-      ) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("getAdditionalDatanode: src=" + src
-          + ", blk=" + blk
-          + ", existings=" + Arrays.asList(existings)
-          + ", excludes=" + Arrays.asList(excludes)
-          + ", numAdditionalNodes=" + numAdditionalNodes
-          + ", clientName=" + clientName);
-    }
-
-    metrics.incrGetAdditionalDatanodeOps();
-
-    HashMap<Node, Node> excludeSet = null;
-    if (excludes != null) {
-      excludeSet = new HashMap<Node, Node>(excludes.length);
-      for (Node node : excludes) {
-        excludeSet.put(node, node);
-      }
-    }
-    return namesystem.getAdditionalDatanode(src, blk,
-        existings, excludeSet, numAdditionalNodes, clientName);
-  }
-
-  /**
-   * The client needs to give up on the block.
-   */
-  @Override // ClientProtocol
-  public void abandonBlock(ExtendedBlock b, String src, String holder)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
-          +b+" of file "+src);
-    }
-    if (!namesystem.abandonBlock(b, src, holder)) {
-      throw new IOException("Cannot abandon block during write to " + src);
-    }
-  }
-
-  @Override // ClientProtocol
-  public boolean complete(String src, String clientName, ExtendedBlock last)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.complete: "
-          + src + " for " + clientName);
-    }
-    return namesystem.completeFile(src, clientName, last);
-  }
-
-  /**
-   * The client has detected an error on the specified located blocks 
-   * and is reporting them to the server.  For now, the namenode will 
-   * mark the block as corrupt.  In the future we might 
-   * check the blocks are actually corrupt. 
-   */
-  @Override // ClientProtocol, DatanodeProtocol
-  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
-    for (int i = 0; i < blocks.length; i++) {
-      ExtendedBlock blk = blocks[i].getBlock();
-      DatanodeInfo[] nodes = blocks[i].getLocations();
-      for (int j = 0; j < nodes.length; j++) {
-        DatanodeInfo dn = nodes[j];
-        namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn);
-      }
-    }
-  }
-
-  @Override // ClientProtocol
-  public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    return namesystem.updateBlockForPipeline(block, clientName);
-  }
-
-
-  @Override // ClientProtocol
-  public void updatePipeline(String clientName, ExtendedBlock oldBlock,
-      ExtendedBlock newBlock, DatanodeID[] newNodes)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
-  }
-  
-  @Override // DatanodeProtocol
-  public void commitBlockSynchronization(ExtendedBlock block,
-      long newgenerationstamp, long newlength,
-      boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    namesystem.commitBlockSynchronization(block,
-        newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
-  }
-  
-  @Override // ClientProtocol
-  public long getPreferredBlockSize(String filename) 
-      throws IOException {
-    checkOperation(OperationCategory.READ);
-    return namesystem.getPreferredBlockSize(filename);
-  }
-    
-  @Deprecated
-  @Override // ClientProtocol
-  public boolean rename(String src, String dst) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
-    }
-    if (!checkPathLength(dst)) {
-      throw new IOException("rename: Pathname too long.  Limit "
-          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
-    }
-    boolean ret = namesystem.renameTo(src, dst);
-    if (ret) {
-      metrics.incrFilesRenamed();
-    }
-    return ret;
-  }
-  
-  @Override // ClientProtocol
-  public void concat(String trg, String[] src) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    namesystem.concat(trg, src);
-  }
-  
-  @Override // ClientProtocol
-  public void rename(String src, String dst, Options.Rename... options)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
-    }
-    if (!checkPathLength(dst)) {
-      throw new IOException("rename: Pathname too long.  Limit "
-          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
-    }
-    namesystem.renameTo(src, dst, options);
-    metrics.incrFilesRenamed();
-  }
-
-  @Deprecated
-  @Override // ClientProtocol
-  public boolean delete(String src) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    return delete(src, true);
-  }
-
-  @Override // ClientProtocol
-  public boolean delete(String src, boolean recursive) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    if (stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
-          + ", recursive=" + recursive);
-    }
-    boolean ret = namesystem.delete(src, recursive);
-    if (ret) 
-      metrics.incrDeleteFileOps();
-    return ret;
-  }
-
-  /**
-   * Check path length does not exceed maximum.  Returns true if
-   * length and depth are okay.  Returns false if length is too long 
-   * or depth is too great.
-   */
-  private boolean checkPathLength(String src) {
-    Path srcPath = new Path(src);
-    return (src.length() <= MAX_PATH_LENGTH &&
-            srcPath.depth() <= MAX_PATH_DEPTH);
-  }
-    
-  @Override // ClientProtocol
-  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
-    }
-    if (!checkPathLength(src)) {
-      throw new IOException("mkdirs: Pathname too long.  Limit " 
-                            + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
-    }
-    return namesystem.mkdirs(src,
-        new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
-            null, masked), createParent);
-  }
-
-  @Override // ClientProtocol
-  public void renewLease(String clientName) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    namesystem.renewLease(clientName);        
-  }
-
-  @Override // ClientProtocol
-  public DirectoryListing getListing(String src, byte[] startAfter,
-      boolean needLocation) throws IOException {
-    checkOperation(OperationCategory.READ);
-    DirectoryListing files = namesystem.getListing(
-        src, startAfter, needLocation);
-    if (files != null) {
-      metrics.incrGetListingOps();
-      metrics.incrFilesInGetListingOps(files.getPartialListing().length);
-    }
-    return files;
-  }
-
-  @Override // ClientProtocol
-  public HdfsFileStatus getFileInfo(String src)  throws IOException {
-    checkOperation(OperationCategory.READ);
-    metrics.incrFileInfoOps();
-    return namesystem.getFileInfo(src, true);
-  }
-
-  @Override // ClientProtocol
-  public HdfsFileStatus getFileLinkInfo(String src) throws IOException { 
-    checkOperation(OperationCategory.READ);
-    metrics.incrFileInfoOps();
-    return namesystem.getFileInfo(src, false);
-  }
-  
-  @Override
-  public long[] getStats() {
-    return namesystem.getStats();
-  }
-
-  @Override // ClientProtocol
-  public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
-      throws IOException {
-    checkOperation(OperationCategory.READ);
-    DatanodeInfo results[] = namesystem.datanodeReport(type);
-    if (results == null ) {
-      throw new IOException("Cannot find datanode report");
-    }
-    return results;
-  }
-    
-  @Override // ClientProtocol
-  public boolean setSafeMode(SafeModeAction action) throws IOException {
-    // TODO:HA decide on OperationCategory for this
-    return namesystem.setSafeMode(action);
-  }
-
   /**
    * Is the cluster currently in safe mode?
    */
@@ -1090,275 +571,8 @@ public class NameNode implements Namenod
     return namesystem.isInSafeMode();
   }
 
-  @Override // ClientProtocol
-  public boolean restoreFailedStorage(String arg) 
-      throws AccessControlException {
-    // TODO:HA decide on OperationCategory for this
-    return namesystem.restoreFailedStorage(arg);
-  }
-
-  @Override // ClientProtocol
-  public void saveNamespace() throws IOException {
-    // TODO:HA decide on OperationCategory for this
-    namesystem.saveNamespace();
-  }
-
-  @Override // ClientProtocol
-  public void refreshNodes() throws IOException {
-    // TODO:HA decide on OperationCategory for this
-    namesystem.getBlockManager().getDatanodeManager().refreshNodes(
-        new HdfsConfiguration());
-  }
-
-  @Override // NamenodeProtocol
-  public long getTransactionID() {
-    // TODO:HA decide on OperationCategory for this
-    return namesystem.getEditLog().getSyncTxId();
-  }
-
-  @Override // NamenodeProtocol
-  public CheckpointSignature rollEditLog() throws IOException {
-    // TODO:HA decide on OperationCategory for this
-    return namesystem.rollEditLog();
-  }
-  
-  @Override // NamenodeProtocol
-  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
-  throws IOException {
-    // TODO:HA decide on OperationCategory for this
-    return namesystem.getEditLog().getEditLogManifest(sinceTxId);
-  }
-    
-  @Override // ClientProtocol
-  public void finalizeUpgrade() throws IOException {
-    // TODO:HA decide on OperationCategory for this
-    namesystem.finalizeUpgrade();
-  }
-
-  @Override // ClientProtocol
-  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
-      throws IOException {
-    // TODO:HA decide on OperationCategory for this
-    return namesystem.distributedUpgradeProgress(action);
-  }
-
-  @Override // ClientProtocol
-  public void metaSave(String filename) throws IOException {
-    // TODO:HA decide on OperationCategory for this
-    namesystem.metaSave(filename);
-  }
-
-  @Override // ClientProtocol
-  public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
-      throws IOException {
-    checkOperation(OperationCategory.READ);
-    Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
-      namesystem.listCorruptFileBlocks(path, cookie);
-    
-    String[] files = new String[fbs.size()];
-    String lastCookie = "";
-    int i = 0;
-    for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
-      files[i++] = fb.path;
-      lastCookie = fb.block.getBlockName();
-    }
-    return new CorruptFileBlocks(files, lastCookie);
-  }
-
-  /**
-   * Tell all datanodes to use a new, non-persistent bandwidth value for
-   * dfs.datanode.balance.bandwidthPerSec.
-   * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
-   * @throws IOException
-   */
-  @Override // ClientProtocol
-  public void setBalancerBandwidth(long bandwidth) throws IOException {
-    // TODO:HA decide on OperationCategory for this
-    namesystem.getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
-  }
-  
-  @Override // ClientProtocol
-  public ContentSummary getContentSummary(String path) throws IOException {
-    checkOperation(OperationCategory.READ);
-    return namesystem.getContentSummary(path);
-  }
-
-  @Override // ClientProtocol
-  public void setQuota(String path, long namespaceQuota, long diskspaceQuota) 
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
-  }
-  
-  @Override // ClientProtocol
-  public void fsync(String src, String clientName) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    namesystem.fsync(src, clientName);
-  }
-
-  @Override // ClientProtocol
-  public void setTimes(String src, long mtime, long atime) 
-      throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    namesystem.setTimes(src, mtime, atime);
-  }
-
-  @Override // ClientProtocol
-  public void createSymlink(String target, String link, FsPermission dirPerms,
-      boolean createParent) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    metrics.incrCreateSymlinkOps();
-    /* We enforce the MAX_PATH_LENGTH limit even though a symlink target 
-     * URI may refer to a non-HDFS file system. 
-     */
-    if (!checkPathLength(link)) {
-      throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH +
-                            " character limit");
-                            
-    }
-    if ("".equals(target)) {
-      throw new IOException("Invalid symlink target");
-    }
-    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    namesystem.createSymlink(target, link,
-      new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
-  }
-
-  @Override // ClientProtocol
-  public String getLinkTarget(String path) throws IOException {
-    checkOperation(OperationCategory.READ);
-    metrics.incrGetLinkTargetOps();
-    /* Resolves the first symlink in the given path, returning a
-     * new path consisting of the target of the symlink and any 
-     * remaining path components from the original path.
-     */
-    try {
-      HdfsFileStatus stat = namesystem.getFileInfo(path, false);
-      if (stat != null) {
-        // NB: getSymlink throws IOException if !stat.isSymlink() 
-        return stat.getSymlink();
-      }
-    } catch (UnresolvedPathException e) {
-      return e.getResolvedPath().toString();
-    } catch (UnresolvedLinkException e) {
-      // The NameNode should only throw an UnresolvedPathException
-      throw new AssertionError("UnresolvedLinkException thrown");
-    }
-    return null;
-  }
-
-
-  @Override // DatanodeProtocol
-  public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
-      throws IOException {
-    verifyVersion(nodeReg.getVersion());
-    namesystem.registerDatanode(nodeReg);
-      
-    return nodeReg;
-  }
-
-  @Override // DatanodeProtocol
-  public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
-      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-      int xmitsInProgress, int xceiverCount, int failedVolumes)
-      throws IOException {
-    verifyRequest(nodeReg);
-    return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
-        blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes);
-  }
-
-  @Override // DatanodeProtocol
-  public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
-      String poolId, long[] blocks) throws IOException {
-    verifyRequest(nodeReg);
-    BlockListAsLongs blist = new BlockListAsLongs(blocks);
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
-           + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
-           + " blocks");
-    }
-
-    namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
-    if (getFSImage().isUpgradeFinalized())
-      return new DatanodeCommand.Finalize(poolId);
-    return null;
-  }
-
-  @Override // DatanodeProtocol
-  public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
-      ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException {
-    verifyRequest(nodeReg);
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
-          +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length
-          +" blocks.");
-    }
-    namesystem.getBlockManager().blockReceivedAndDeleted(
-        nodeReg, poolId, receivedAndDeletedBlocks);
-  }
-
-  @Override // DatanodeProtocol
-  public void errorReport(DatanodeRegistration nodeReg,
-                          int errorCode, String msg) throws IOException { 
-    String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName());
-
-    if (errorCode == DatanodeProtocol.NOTIFY) {
-      LOG.info("Error report from " + dnName + ": " + msg);
-      return;
-    }
-    verifyRequest(nodeReg);
-
-    if (errorCode == DatanodeProtocol.DISK_ERROR) {
-      LOG.warn("Disk error on " + dnName + ": " + msg);
-    } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
-      LOG.warn("Fatal disk error on " + dnName + ": " + msg);
-      namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg);            
-    } else {
-      LOG.info("Error report from " + dnName + ": " + msg);
-    }
-  }
-    
-  @Override // DatanodeProtocol, NamenodeProtocol
-  public NamespaceInfo versionRequest() throws IOException {
-    return namesystem.getNamespaceInfo();
-  }
-
-  @Override // DatanodeProtocol
-  public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
-    return namesystem.processDistributedUpgradeCommand(comm);
-  }
-
-  /** 
-   * Verify request.
-   * 
-   * Verifies correctness of the datanode version, registration ID, and 
-   * if the datanode does not need to be shutdown.
-   * 
-   * @param nodeReg data node registration
-   * @throws IOException
-   */
-  public void verifyRequest(NodeRegistration nodeReg) throws IOException {
-    verifyVersion(nodeReg.getVersion());
-    if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID())) {
-      LOG.warn("Invalid registrationID - expected: "
-          + namesystem.getRegistrationID() + " received: "
-          + nodeReg.getRegistrationID());
-      throw new UnregisteredNodeException(nodeReg);
-    }
-  }
-    
-  /**
-   * Verify version.
-   * 
-   * @param version
-   * @throws IOException
-   */
-  public void verifyVersion(int version) throws IOException {
-    if (version != FSConstants.LAYOUT_VERSION)
-      throw new IncorrectVersionException(version, "data node");
-  }
-    
-  public FSImage getFSImage() {
+  /** get FSImage */
+  FSImage getFSImage() {
     return namesystem.dir.fsImage;
   }
 
@@ -1367,7 +581,7 @@ public class NameNode implements Namenod
    * @return namenode rpc address
    */
   public InetSocketAddress getNameNodeAddress() {
-    return rpcAddress;
+    return rpcServer.getRpcAddress();
   }
   
   /**
@@ -1376,7 +590,7 @@ public class NameNode implements Namenod
    * @return namenode service rpc address used by datanodes
    */
   public InetSocketAddress getServiceRpcAddress() {
-    return serviceRPCAddress != null ? serviceRPCAddress : rpcAddress;
+    return rpcServer.getServiceRpcAddress() != null ? rpcServer.getServiceRpcAddress() : rpcServer.getRpcAddress();
   }
 
   /**
@@ -1437,16 +651,16 @@ public class NameNode implements Namenod
     }
     System.out.println("Formatting using clusterid: " + clusterId);
     
-    FSImage fsImage = new FSImage(conf, null, dirsToFormat, editDirsToFormat);
-    FSNamesystem nsys = new FSNamesystem(fsImage, conf);
-    nsys.dir.fsImage.format(clusterId);
+    FSImage fsImage = new FSImage(conf, dirsToFormat, editDirsToFormat);
+    FSNamesystem fsn = new FSNamesystem(conf, fsImage);
+    fsImage.format(fsn, clusterId);
     return false;
   }
 
   private static boolean finalize(Configuration conf,
                                boolean isConfirmationNeeded
                                ) throws IOException {
-    FSNamesystem nsys = new FSNamesystem(new FSImage(conf), conf);
+    FSNamesystem nsys = new FSNamesystem(conf, new FSImage(conf));
     System.err.print(
         "\"finalize\" will remove the previous state of the files system.\n"
         + "Recent upgrade will become permanent.\n"
@@ -1461,40 +675,6 @@ public class NameNode implements Namenod
     return false;
   }
 
-  @Override // RefreshAuthorizationPolicyProtocol
-  public void refreshServiceAcl() throws IOException {
-    if (!serviceAuthEnabled) {
-      throw new AuthorizationException("Service Level Authorization not enabled!");
-    }
-
-    this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
-    if (this.serviceRpcServer != null) {
-      this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
-    }
-  }
-
-  @Override // RefreshAuthorizationPolicyProtocol
-  public void refreshUserToGroupsMappings() throws IOException {
-    LOG.info("Refreshing all user-to-groups mappings. Requested by user: " + 
-             UserGroupInformation.getCurrentUser().getShortUserName());
-    Groups.getUserToGroupsMappingService().refresh();
-  }
-
-  @Override // RefreshAuthorizationPolicyProtocol
-  public void refreshSuperUserGroupsConfiguration() {
-    LOG.info("Refreshing SuperUser proxy group mapping list ");
-
-    ProxyUsers.refreshSuperUserGroupsConfiguration();
-  }
-  
-  @Override // GetUserMappingsProtocol
-  public String[] getGroupsForUser(String user) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Getting groups for user " + user);
-    }
-    return UserGroupInformation.createRemoteUser(user).getGroupNames();
-  }
-
   private static void printUsage() {
     System.err.println(
       "Usage: java NameNode [" +
@@ -1647,7 +827,7 @@ public class NameNode implements Namenod
     DFSUtil.setGenericConf(conf, nameserviceId, NAMESERVICE_SPECIFIC_KEYS);
     
     if (conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY) != null) {
-      URI defaultUri = URI.create(FSConstants.HDFS_URI_SCHEME + "://"
+      URI defaultUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
           + conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY));
       conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
     }
@@ -1666,17 +846,8 @@ public class NameNode implements Namenod
       System.exit(-1);
     }
   }
-  
-  private static String getClientMachine() {
-    String clientMachine = Server.getRemoteAddress();
-    if (clientMachine == null) {
-      clientMachine = "";
-    }
-    return clientMachine;
-  }
-  
-  @Override // HAServiceProtocol
-  public synchronized void monitorHealth() throws HealthCheckFailedException {
+
+  synchronized void monitorHealth() throws HealthCheckFailedException {
     if (!haEnabled) {
       return; // no-op, if HA is not eanbled
     }
@@ -1684,16 +855,14 @@ public class NameNode implements Namenod
     return;
   }
   
-  @Override // HAServiceProtocol
-  public synchronized void transitionToActive() throws ServiceFailedException {
+  synchronized void transitionToActive() throws ServiceFailedException {
     if (!haEnabled) {
       throw new ServiceFailedException("HA for namenode is not enabled");
     }
     state.setState(this, ACTIVE_STATE);
   }
   
-  @Override // HAServiceProtocol
-  public synchronized void transitionToStandby() throws ServiceFailedException {
+  synchronized void transitionToStandby() throws ServiceFailedException {
     if (!haEnabled) {
       throw new ServiceFailedException("HA for namenode is not enabled");
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java Thu Sep  8 01:39:07 2011
@@ -62,7 +62,7 @@ public class NameNodeResourceChecker {
 
     duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
         DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
-
+  
     Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
         .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
 

Copied: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (from r1166484, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?p2=hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java&r1=1166484&r2=1166495&rev=1166495&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Thu Sep  8 01:39:07 2011
@@ -38,6 +38,9 @@ import org.apache.hadoop.fs.UnresolvedLi
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+
+import org.apache.hadoop.ha.HealthCheckFailedException;
+import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -61,6 +64,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -241,6 +245,7 @@ class NameNodeRpcServer implements Namen
   public void errorReport(NamenodeRegistration registration,
                           int errorCode, 
                           String msg) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     verifyRequest(registration);
     LOG.info("Error report from " + registration + ": " + msg);
     if(errorCode == FATAL)
@@ -268,27 +273,28 @@ class NameNodeRpcServer implements Namen
   @Override // NamenodeProtocol
   public void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
-    verifyRequest(registration);
-    if(!nn.isRole(NamenodeRole.NAMENODE))
-      throw new IOException("Only an ACTIVE node can invoke endCheckpoint.");
+    nn.checkOperation(OperationCategory.CHECKPOINT);
     namesystem.endCheckpoint(registration, sig);
   }
 
   @Override // ClientProtocol
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     return namesystem.getDelegationToken(renewer);
   }
 
   @Override // ClientProtocol
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     return namesystem.renewDelegationToken(token);
   }
 
   @Override // ClientProtocol
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     namesystem.cancelDelegationToken(token);
   }
   
@@ -297,6 +303,7 @@ class NameNodeRpcServer implements Namen
                                           long offset, 
                                           long length) 
       throws IOException {
+    nn.checkOperation(OperationCategory.READ);
     metrics.incrGetBlockLocations();
     return namesystem.getBlockLocations(getClientMachine(), 
                                         src, offset, length);
@@ -315,6 +322,7 @@ class NameNodeRpcServer implements Namen
                      boolean createParent,
                      short replication,
                      long blockSize) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     String clientMachine = getClientMachine();
     if (stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*DIR* NameNode.create: file "
@@ -335,6 +343,7 @@ class NameNodeRpcServer implements Namen
   @Override // ClientProtocol
   public LocatedBlock append(String src, String clientName) 
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     String clientMachine = getClientMachine();
     if (stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*DIR* NameNode.append: file "
@@ -347,6 +356,7 @@ class NameNodeRpcServer implements Namen
 
   @Override // ClientProtocol
   public boolean recoverLease(String src, String clientName) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     String clientMachine = getClientMachine();
     return namesystem.recoverLease(src, clientName, clientMachine);
   }
@@ -354,18 +364,21 @@ class NameNodeRpcServer implements Namen
   @Override // ClientProtocol
   public boolean setReplication(String src, short replication) 
     throws IOException {  
+    nn.checkOperation(OperationCategory.WRITE);
     return namesystem.setReplication(src, replication);
   }
     
   @Override // ClientProtocol
   public void setPermission(String src, FsPermission permissions)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     namesystem.setPermission(src, permissions);
   }
 
   @Override // ClientProtocol
   public void setOwner(String src, String username, String groupname)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     namesystem.setOwner(src, username, groupname);
   }
 
@@ -375,6 +388,7 @@ class NameNodeRpcServer implements Namen
                                ExtendedBlock previous,
                                DatanodeInfo[] excludedNodes)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     if(stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
           +src+" for "+clientName);
@@ -398,6 +412,7 @@ class NameNodeRpcServer implements Namen
       final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
       final int numAdditionalNodes, final String clientName
       ) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     if (LOG.isDebugEnabled()) {
       LOG.debug("getAdditionalDatanode: src=" + src
           + ", blk=" + blk
@@ -419,12 +434,13 @@ class NameNodeRpcServer implements Namen
     return namesystem.getAdditionalDatanode(src, blk,
         existings, excludeSet, numAdditionalNodes, clientName);
   }
-
   /**
    * The client needs to give up on the block.
    */
+  @Override // ClientProtocol
   public void abandonBlock(ExtendedBlock b, String src, String holder)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     if(stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
           +b+" of file "+src);
@@ -437,6 +453,7 @@ class NameNodeRpcServer implements Namen
   @Override // ClientProtocol
   public boolean complete(String src, String clientName, ExtendedBlock last)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     if(stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*DIR* NameNode.complete: "
           + src + " for " + clientName);
@@ -450,8 +467,9 @@ class NameNodeRpcServer implements Namen
    * mark the block as corrupt.  In the future we might 
    * check the blocks are actually corrupt. 
    */
-  @Override
+  @Override // ClientProtocol, DatanodeProtocol
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
     for (int i = 0; i < blocks.length; i++) {
       ExtendedBlock blk = blocks[i].getBlock();
@@ -466,6 +484,7 @@ class NameNodeRpcServer implements Namen
   @Override // ClientProtocol
   public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     return namesystem.updateBlockForPipeline(block, clientName);
   }
 
@@ -474,6 +493,7 @@ class NameNodeRpcServer implements Namen
   public void updatePipeline(String clientName, ExtendedBlock oldBlock,
       ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
   }
   
@@ -482,6 +502,7 @@ class NameNodeRpcServer implements Namen
       long newgenerationstamp, long newlength,
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     namesystem.commitBlockSynchronization(block,
         newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
   }
@@ -489,12 +510,14 @@ class NameNodeRpcServer implements Namen
   @Override // ClientProtocol
   public long getPreferredBlockSize(String filename) 
       throws IOException {
+    nn.checkOperation(OperationCategory.READ);
     return namesystem.getPreferredBlockSize(filename);
   }
     
   @Deprecated
   @Override // ClientProtocol
   public boolean rename(String src, String dst) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     if(stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
     }
@@ -511,12 +534,14 @@ class NameNodeRpcServer implements Namen
   
   @Override // ClientProtocol
   public void concat(String trg, String[] src) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     namesystem.concat(trg, src);
   }
   
   @Override // ClientProtocol
   public void rename(String src, String dst, Options.Rename... options)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     if(stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
     }
@@ -531,11 +556,13 @@ class NameNodeRpcServer implements Namen
   @Deprecated
   @Override // ClientProtocol
   public boolean delete(String src) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     return delete(src, true);
   }
 
   @Override // ClientProtocol
   public boolean delete(String src, boolean recursive) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     if (stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
           + ", recursive=" + recursive);
@@ -560,6 +587,7 @@ class NameNodeRpcServer implements Namen
   @Override // ClientProtocol
   public boolean mkdirs(String src, FsPermission masked, boolean createParent)
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     if(stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
     }
@@ -574,13 +602,14 @@ class NameNodeRpcServer implements Namen
 
   @Override // ClientProtocol
   public void renewLease(String clientName) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     namesystem.renewLease(clientName);        
   }
 
   @Override // ClientProtocol
   public DirectoryListing getListing(String src, byte[] startAfter,
-      boolean needLocation)
-  throws IOException {
+      boolean needLocation) throws IOException {
+    nn.checkOperation(OperationCategory.READ);
     DirectoryListing files = namesystem.getListing(
         src, startAfter, needLocation);
     if (files != null) {
@@ -592,12 +621,14 @@ class NameNodeRpcServer implements Namen
 
   @Override // ClientProtocol
   public HdfsFileStatus getFileInfo(String src)  throws IOException {
+    nn.checkOperation(OperationCategory.READ);
     metrics.incrFileInfoOps();
     return namesystem.getFileInfo(src, true);
   }
 
   @Override // ClientProtocol
   public HdfsFileStatus getFileLinkInfo(String src) throws IOException { 
+    nn.checkOperation(OperationCategory.READ);
     metrics.incrFileInfoOps();
     return namesystem.getFileInfo(src, false);
   }
@@ -610,6 +641,7 @@ class NameNodeRpcServer implements Namen
   @Override // ClientProtocol
   public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
       throws IOException {
+    nn.checkOperation(OperationCategory.READ);
     DatanodeInfo results[] = namesystem.datanodeReport(type);
     if (results == null ) {
       throw new IOException("Cannot find datanode report");
@@ -619,61 +651,70 @@ class NameNodeRpcServer implements Namen
     
   @Override // ClientProtocol
   public boolean setSafeMode(SafeModeAction action) throws IOException {
+    // TODO:HA decide on OperationCategory for this
     return namesystem.setSafeMode(action);
   }
-
   @Override // ClientProtocol
   public boolean restoreFailedStorage(String arg) 
       throws AccessControlException {
+    // TODO:HA decide on OperationCategory for this
     return namesystem.restoreFailedStorage(arg);
   }
 
   @Override // ClientProtocol
   public void saveNamespace() throws IOException {
+    // TODO:HA decide on OperationCategory for this
     namesystem.saveNamespace();
   }
 
   @Override // ClientProtocol
   public void refreshNodes() throws IOException {
+    // TODO:HA decide on OperationCategory for this
     namesystem.getBlockManager().getDatanodeManager().refreshNodes(
         new HdfsConfiguration());
   }
 
   @Override // NamenodeProtocol
   public long getTransactionID() {
+    // TODO:HA decide on OperationCategory for this
     return namesystem.getEditLog().getSyncTxId();
   }
 
   @Override // NamenodeProtocol
   public CheckpointSignature rollEditLog() throws IOException {
+    // TODO:HA decide on OperationCategory for this
     return namesystem.rollEditLog();
   }
   
-  @Override
+  @Override // NamenodeProtocol
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
   throws IOException {
+    // TODO:HA decide on OperationCategory for this
     return namesystem.getEditLog().getEditLogManifest(sinceTxId);
   }
     
   @Override // ClientProtocol
   public void finalizeUpgrade() throws IOException {
+    // TODO:HA decide on OperationCategory for this
     namesystem.finalizeUpgrade();
   }
 
   @Override // ClientProtocol
   public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
       throws IOException {
+    // TODO:HA decide on OperationCategory for this
     return namesystem.distributedUpgradeProgress(action);
   }
 
   @Override // ClientProtocol
   public void metaSave(String filename) throws IOException {
+    // TODO:HA decide on OperationCategory for this
     namesystem.metaSave(filename);
   }
-
   @Override // ClientProtocol
   public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException {
+    nn.checkOperation(OperationCategory.READ);
     Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
       namesystem.listCorruptFileBlocks(path, cookie);
     
@@ -695,34 +736,40 @@ class NameNodeRpcServer implements Namen
    */
   @Override // ClientProtocol
   public void setBalancerBandwidth(long bandwidth) throws IOException {
+    // TODO:HA decide on OperationCategory for this
     namesystem.getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
   }
   
   @Override // ClientProtocol
   public ContentSummary getContentSummary(String path) throws IOException {
+    nn.checkOperation(OperationCategory.READ);
     return namesystem.getContentSummary(path);
   }
 
   @Override // ClientProtocol
   public void setQuota(String path, long namespaceQuota, long diskspaceQuota) 
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
   }
   
   @Override // ClientProtocol
   public void fsync(String src, String clientName) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     namesystem.fsync(src, clientName);
   }
 
   @Override // ClientProtocol
   public void setTimes(String src, long mtime, long atime) 
       throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     namesystem.setTimes(src, mtime, atime);
   }
 
   @Override // ClientProtocol
   public void createSymlink(String target, String link, FsPermission dirPerms,
       boolean createParent) throws IOException {
+    nn.checkOperation(OperationCategory.WRITE);
     metrics.incrCreateSymlinkOps();
     /* We enforce the MAX_PATH_LENGTH limit even though a symlink target 
      * URI may refer to a non-HDFS file system. 
@@ -742,6 +789,7 @@ class NameNodeRpcServer implements Namen
 
   @Override // ClientProtocol
   public String getLinkTarget(String path) throws IOException {
+    nn.checkOperation(OperationCategory.READ);
     metrics.incrGetLinkTargetOps();
     /* Resolves the first symlink in the given path, returning a
      * new path consisting of the target of the symlink and any 
@@ -896,7 +944,21 @@ class NameNodeRpcServer implements Namen
     return UserGroupInformation.createRemoteUser(user).getGroupNames();
   }
 
-
+  @Override // HAServiceProtocol
+  public synchronized void monitorHealth() throws HealthCheckFailedException {
+    nn.monitorHealth();
+  }
+  
+  @Override // HAServiceProtocol
+  public synchronized void transitionToActive() throws ServiceFailedException {
+    nn.transitionToActive();
+  }
+  
+  @Override // HAServiceProtocol
+  public synchronized void transitionToStandby() throws ServiceFailedException {
+    nn.transitionToStandby();
+  }
+  
   /**
    * Verify version.
    * 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Thu Sep  8 01:39:07 2011
@@ -48,7 +48,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
@@ -173,7 +173,7 @@ public class NamenodeFsck {
       out.println(msg);
       namenode.getNamesystem().logFsckEvent(path, remoteAddress);
 
-      final HdfsFileStatus file = namenode.getFileInfo(path);
+      final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(path);
       if (file != null) {
 
         if (showCorruptFileBlocks) {
@@ -250,7 +250,8 @@ public class NamenodeFsck {
       res.totalDirs++;
       do {
         assert lastReturnedName != null;
-        thisListing = namenode.getListing(path, lastReturnedName, false);
+        thisListing = namenode.getRpcServer().getListing(
+            path, lastReturnedName, false);
         if (thisListing == null) {
           return;
         }
@@ -385,7 +386,7 @@ public class NamenodeFsck {
         break;
       case FIXING_DELETE:
         if (!isOpen)
-          namenode.delete(path, true);
+          namenode.getRpcServer().delete(path, true);
       }
     }
     if (showFiles) {
@@ -414,7 +415,8 @@ public class NamenodeFsck {
     String target = lostFound + fullName;
     String errmsg = "Failed to move " + fullName + " to /lost+found";
     try {
-      if (!namenode.mkdirs(target, file.getPermission(), true)) {
+      if (!namenode.getRpcServer().mkdirs(
+          target, file.getPermission(), true)) {
         LOG.warn(errmsg);
         return;
       }
@@ -502,8 +504,8 @@ public class NamenodeFsck {
       }
       try {
         s = new Socket();
-        s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
-        s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+        s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
+        s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
         
         String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
             block.getBlockId());

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Thu Sep  8 01:39:07 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -354,7 +355,7 @@ class NamenodeJspHelper {
     }
   }
 
-  static String getDelegationToken(final NameNode nn,
+  static String getDelegationToken(final NamenodeProtocols nn,
       HttpServletRequest request, Configuration conf,
       final UserGroupInformation ugi) throws IOException, InterruptedException {
     Token<DelegationTokenIdentifier> token = ugi
@@ -381,7 +382,8 @@ class NamenodeJspHelper {
         .getAttribute(JspHelper.CURRENT_CONF);
     final DatanodeID datanode = getRandomDatanode(nn);
     UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
-    String tokenString = getDelegationToken(nn, request, conf, ugi);
+    String tokenString = getDelegationToken(
+        nn.getRpcServer(), request, conf, ugi);
     // if the user is defined, get a delegation token and stringify it
     final String redirectLocation;
     final String nodeToRedirect;

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java Thu Sep  8 01:39:07 2011
@@ -70,7 +70,7 @@ public class RenewDelegationTokenServlet
     try {
       long result = ugi.doAs(new PrivilegedExceptionAction<Long>() {
         public Long run() throws Exception {
-          return nn.renewDelegationToken(token);
+          return nn.getRpcServer().renewDelegationToken(token);
         }
       });
       PrintStream os = new PrintStream(resp.getOutputStream());

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Thu Sep  8 01:39:07 2011
@@ -45,8 +45,8 @@ import static org.apache.hadoop.hdfs.DFS
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -122,6 +122,8 @@ public class SecondaryNameNode implement
   /** checkpoint once every this many transactions, regardless of time */
   private long checkpointTxnCount;
 
+  private FSNamesystem namesystem;
+
 
   /** {@inheritDoc} */
   public String toString() {
@@ -220,6 +222,8 @@ public class SecondaryNameNode implement
                                   "/tmp/hadoop/dfs/namesecondary");    
     checkpointImage = new CheckpointStorage(conf, checkpointDirs, checkpointEditsDirs);
     checkpointImage.recoverCreate(commandLineOpts.shouldFormat());
+    
+    namesystem = new FSNamesystem(conf, checkpointImage);
 
     // Initialize other scheduling parameters from the configuration
     checkpointCheckPeriod = conf.getLong(
@@ -456,7 +460,7 @@ public class SecondaryNameNode implement
    */
   private String getInfoServer() throws IOException {
     URI fsName = FileSystem.getDefaultUri(conf);
-    if (!FSConstants.HDFS_URI_SCHEME.equalsIgnoreCase(fsName.getScheme())) {
+    if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(fsName.getScheme())) {
       throw new IOException("This is not a DFS");
     }
 
@@ -520,7 +524,7 @@ public class SecondaryNameNode implement
 
     boolean loadImage = downloadCheckpointFiles(
         fsName, checkpointImage, sig, manifest);   // Fetch fsimage and edits
-    doMerge(sig, manifest, loadImage, checkpointImage);
+    doMerge(sig, manifest, loadImage, checkpointImage, namesystem);
     
     //
     // Upload the new image into the NameNode. Then tell the Namenode
@@ -750,8 +754,7 @@ public class SecondaryNameNode implement
     CheckpointStorage(Configuration conf, 
                       Collection<URI> imageDirs,
                       Collection<URI> editsDirs) throws IOException {
-      super(conf, (FSNamesystem)null, imageDirs, editsDirs);
-      setFSNamesystem(new FSNamesystem(this, conf));
+      super(conf, imageDirs, editsDirs);
       
       // the 2NN never writes edits -- it only downloads them. So
       // we shouldn't have any editLog instance. Setting to null
@@ -793,7 +796,7 @@ public class SecondaryNameNode implement
         
         StorageState curState;
         try {
-          curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR, storage);
+          curState = sd.analyzeStorage(HdfsServerConstants.StartupOption.REGULAR, storage);
           // sd is locked but not opened
           switch(curState) {
           case NON_EXISTENT:
@@ -837,7 +840,8 @@ public class SecondaryNameNode implement
     
   static void doMerge(
       CheckpointSignature sig, RemoteEditLogManifest manifest,
-      boolean loadImage, FSImage dstImage) throws IOException {   
+      boolean loadImage, FSImage dstImage, FSNamesystem dstNamesystem)
+      throws IOException {   
     NNStorage dstStorage = dstImage.getStorage();
     
     dstStorage.setStorageInfo(sig);
@@ -848,11 +852,11 @@ public class SecondaryNameNode implement
             sig.mostRecentCheckpointTxId + " even though it should have " +
             "just been downloaded");
       }
-      dstImage.reloadFromImageFile(file);
+      dstImage.reloadFromImageFile(file, dstNamesystem);
     }
     
-    Checkpointer.rollForwardByApplyingLogs(manifest, dstImage);
-    dstImage.saveFSImageInAllDirs(dstImage.getLastAppliedTxId());
+    Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem);
+    dstImage.saveFSImageInAllDirs(dstNamesystem, dstImage.getLastAppliedTxId());
     dstStorage.writeAll();
   }
 }



Mime
View raw message