hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1169868 [2/3] - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/ src/test/java/org/apache/hadoop/hdfs/security/token/bloc...
Date Mon, 12 Sep 2011 18:59:43 GMT
Added: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1169868&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Mon Sep 12 18:59:42 2011
@@ -0,0 +1,920 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_DEPTH;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_LENGTH;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import org.apache.hadoop.hdfs.HDFSPolicyProvider;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
+import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
+
+/**
+ * This class is responsible for handling all of the RPC calls to the NameNode.
+ * It is created, started, and stopped by {@link NameNode}.
+ */
+class NameNodeRpcServer implements NamenodeProtocols {
+  
+  private static final Log LOG = NameNode.LOG;
+  private static final Log stateChangeLog = NameNode.stateChangeLog;
+  
+  // Dependencies from other parts of NN.
+  private final FSNamesystem namesystem;
+  protected final NameNode nn;
+  private final NameNodeMetrics metrics;
+  
+  private final boolean serviceAuthEnabled;
+
+  /** The RPC server that listens to requests from DataNodes */
+  private final RPC.Server serviceRpcServer;
+  private final InetSocketAddress serviceRPCAddress;
+  
+  /** The RPC server that listens to requests from clients */
+  protected final RPC.Server server;
+  protected final InetSocketAddress rpcAddress;
+
+  public NameNodeRpcServer(Configuration conf, NameNode nn)
+      throws IOException {
+    this.nn = nn;
+    this.namesystem = nn.getNamesystem();
+    this.metrics = NameNode.getNameNodeMetrics();
+    
+    int handlerCount = 
+      conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, 
+                  DFS_DATANODE_HANDLER_COUNT_DEFAULT);
+    InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
+
+    InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
+    if (dnSocketAddr != null) {
+      int serviceHandlerCount =
+        conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
+                    DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
+      this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
+          dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,
+          false, conf, namesystem.getDelegationTokenSecretManager());
+      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
+      nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
+    } else {
+      serviceRpcServer = null;
+      serviceRPCAddress = null;
+    }
+    this.server = RPC.getServer(NamenodeProtocols.class, this,
+                                socAddr.getHostName(), socAddr.getPort(),
+                                handlerCount, false, conf, 
+                                namesystem.getDelegationTokenSecretManager());
+
+    // set service-level authorization security policy
+    if (serviceAuthEnabled =
+          conf.getBoolean(
+            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+      this.server.refreshServiceAcl(conf, new HDFSPolicyProvider());
+      if (this.serviceRpcServer != null) {
+        this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
+      }
+    }
+
+    // The rpc-server port can be ephemeral... ensure we have the correct info
+    this.rpcAddress = this.server.getListenerAddress(); 
+    nn.setRpcServerAddress(conf, rpcAddress);
+  }
+  
+  /**
+   * Actually start serving requests.
+   */
+  void start() {
+    server.start();  //start RPC server
+    if (serviceRpcServer != null) {
+      serviceRpcServer.start();      
+    }
+  }
+  
+  /**
+   * Wait until the RPC server has shut down.
+   */
+  void join() throws InterruptedException {
+    this.server.join();
+  }
+  
+  void stop() {
+    if(server != null) server.stop();
+    if(serviceRpcServer != null) serviceRpcServer.stop();
+  }
+  
+  InetSocketAddress getServiceRpcAddress() {
+    return serviceRPCAddress;
+  }
+
+  InetSocketAddress getRpcAddress() {
+    return rpcAddress;
+  }
+  
+  @Override // VersionedProtocol
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSignature(
+        this, protocol, clientVersion, clientMethodsHash);
+  }
+  
+  @Override
+  public long getProtocolVersion(String protocol, 
+                                 long clientVersion) throws IOException {
+    if (protocol.equals(ClientProtocol.class.getName())) {
+      return ClientProtocol.versionID; 
+    } else if (protocol.equals(DatanodeProtocol.class.getName())){
+      return DatanodeProtocol.versionID;
+    } else if (protocol.equals(NamenodeProtocol.class.getName())){
+      return NamenodeProtocol.versionID;
+    } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
+      return RefreshAuthorizationPolicyProtocol.versionID;
+    } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){
+      return RefreshUserMappingsProtocol.versionID;
+    } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
+      return GetUserMappingsProtocol.versionID;
+    } else {
+      throw new IOException("Unknown protocol to name node: " + protocol);
+    }
+  }
+
+  /////////////////////////////////////////////////////
+  // NamenodeProtocol
+  /////////////////////////////////////////////////////
+  @Override // NamenodeProtocol
+  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+  throws IOException {
+    if(size <= 0) {
+      throw new IllegalArgumentException(
+        "Unexpected not positive size: "+size);
+    }
+
+    return namesystem.getBlockManager().getBlocks(datanode, size); 
+  }
+
+  @Override // NamenodeProtocol
+  public ExportedBlockKeys getBlockKeys() throws IOException {
+    return namesystem.getBlockManager().getBlockKeys();
+  }
+
+  @Override // NamenodeProtocol
+  public void errorReport(NamenodeRegistration registration,
+                          int errorCode, 
+                          String msg) throws IOException {
+    verifyRequest(registration);
+    LOG.info("Error report from " + registration + ": " + msg);
+    if(errorCode == FATAL)
+      namesystem.releaseBackupNode(registration);
+  }
+
+  @Override // NamenodeProtocol
+  public NamenodeRegistration register(NamenodeRegistration registration)
+  throws IOException {
+    verifyVersion(registration.getVersion());
+    NamenodeRegistration myRegistration = nn.setRegistration();
+    namesystem.registerBackupNode(registration, myRegistration);
+    return myRegistration;
+  }
+
+  @Override // NamenodeProtocol
+  public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+  throws IOException {
+    verifyRequest(registration);
+    if(!nn.isRole(NamenodeRole.NAMENODE))
+      throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
+    return namesystem.startCheckpoint(registration, nn.setRegistration());
+  }
+
+  @Override // NamenodeProtocol
+  public void endCheckpoint(NamenodeRegistration registration,
+                            CheckpointSignature sig) throws IOException {
+    verifyRequest(registration);
+    if(!nn.isRole(NamenodeRole.NAMENODE))
+      throw new IOException("Only an ACTIVE node can invoke endCheckpoint.");
+    namesystem.endCheckpoint(registration, sig);
+  }
+
+  @Override // ClientProtocol
+  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+      throws IOException {
+    return namesystem.getDelegationToken(renewer);
+  }
+
+  @Override // ClientProtocol
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
+      throws InvalidToken, IOException {
+    return namesystem.renewDelegationToken(token);
+  }
+
+  @Override // ClientProtocol
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
+      throws IOException {
+    namesystem.cancelDelegationToken(token);
+  }
+  
+  @Override // ClientProtocol
+  public LocatedBlocks getBlockLocations(String src, 
+                                          long offset, 
+                                          long length) 
+      throws IOException {
+    metrics.incrGetBlockLocations();
+    return namesystem.getBlockLocations(getClientMachine(), 
+                                        src, offset, length);
+  }
+  
+  @Override // ClientProtocol
+  public FsServerDefaults getServerDefaults() throws IOException {
+    return namesystem.getServerDefaults();
+  }
+
+  @Override // ClientProtocol
+  public void create(String src, 
+                     FsPermission masked,
+                     String clientName, 
+                     EnumSetWritable<CreateFlag> flag,
+                     boolean createParent,
+                     short replication,
+                     long blockSize) throws IOException {
+    String clientMachine = getClientMachine();
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.create: file "
+                         +src+" for "+clientName+" at "+clientMachine);
+    }
+    if (!checkPathLength(src)) {
+      throw new IOException("create: Pathname too long.  Limit "
+          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    namesystem.startFile(src,
+        new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
+            null, masked),
+        clientName, clientMachine, flag.get(), createParent, replication, blockSize);
+    metrics.incrFilesCreated();
+    metrics.incrCreateFileOps();
+  }
+
+  @Override // ClientProtocol
+  public LocatedBlock append(String src, String clientName) 
+      throws IOException {
+    String clientMachine = getClientMachine();
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.append: file "
+          +src+" for "+clientName+" at "+clientMachine);
+    }
+    LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
+    metrics.incrFilesAppended();
+    return info;
+  }
+
+  @Override // ClientProtocol
+  public boolean recoverLease(String src, String clientName) throws IOException {
+    String clientMachine = getClientMachine();
+    return namesystem.recoverLease(src, clientName, clientMachine);
+  }
+
+  @Override // ClientProtocol
+  public boolean setReplication(String src, short replication) 
+    throws IOException {  
+    return namesystem.setReplication(src, replication);
+  }
+    
+  @Override // ClientProtocol
+  public void setPermission(String src, FsPermission permissions)
+      throws IOException {
+    namesystem.setPermission(src, permissions);
+  }
+
+  @Override // ClientProtocol
+  public void setOwner(String src, String username, String groupname)
+      throws IOException {
+    namesystem.setOwner(src, username, groupname);
+  }
+
+  @Override // ClientProtocol
+  public LocatedBlock addBlock(String src,
+                               String clientName,
+                               ExtendedBlock previous,
+                               DatanodeInfo[] excludedNodes)
+      throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+          +src+" for "+clientName);
+    }
+    HashMap<Node, Node> excludedNodesSet = null;
+    if (excludedNodes != null) {
+      excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
+      for (Node node:excludedNodes) {
+        excludedNodesSet.put(node, node);
+      }
+    }
+    LocatedBlock locatedBlock = 
+      namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
+    if (locatedBlock != null)
+      metrics.incrAddBlockOps();
+    return locatedBlock;
+  }
+
+  @Override // ClientProtocol
+  public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getAdditionalDatanode: src=" + src
+          + ", blk=" + blk
+          + ", existings=" + Arrays.asList(existings)
+          + ", excludes=" + Arrays.asList(excludes)
+          + ", numAdditionalNodes=" + numAdditionalNodes
+          + ", clientName=" + clientName);
+    }
+
+    metrics.incrGetAdditionalDatanodeOps();
+
+    HashMap<Node, Node> excludeSet = null;
+    if (excludes != null) {
+      excludeSet = new HashMap<Node, Node>(excludes.length);
+      for (Node node : excludes) {
+        excludeSet.put(node, node);
+      }
+    }
+    return namesystem.getAdditionalDatanode(src, blk,
+        existings, excludeSet, numAdditionalNodes, clientName);
+  }
+
+  /**
+   * The client needs to give up on the block.
+   */
+  public void abandonBlock(ExtendedBlock b, String src, String holder)
+      throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
+          +b+" of file "+src);
+    }
+    if (!namesystem.abandonBlock(b, src, holder)) {
+      throw new IOException("Cannot abandon block during write to " + src);
+    }
+  }
+
+  @Override // ClientProtocol
+  public boolean complete(String src, String clientName, ExtendedBlock last)
+      throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.complete: "
+          + src + " for " + clientName);
+    }
+    return namesystem.completeFile(src, clientName, last);
+  }
+
+  /**
+   * The client has detected an error on the specified located blocks 
+   * and is reporting them to the server.  For now, the namenode will 
+   * mark the block as corrupt.  In the future we might 
+   * check the blocks are actually corrupt. 
+   */
+  @Override
+  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+    stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
+    for (int i = 0; i < blocks.length; i++) {
+      ExtendedBlock blk = blocks[i].getBlock();
+      DatanodeInfo[] nodes = blocks[i].getLocations();
+      for (int j = 0; j < nodes.length; j++) {
+        DatanodeInfo dn = nodes[j];
+        namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn);
+      }
+    }
+  }
+
+  @Override // ClientProtocol
+  public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
+      throws IOException {
+    return namesystem.updateBlockForPipeline(block, clientName);
+  }
+
+
+  @Override // ClientProtocol
+  public void updatePipeline(String clientName, ExtendedBlock oldBlock,
+      ExtendedBlock newBlock, DatanodeID[] newNodes)
+      throws IOException {
+    namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
+  }
+  
+  @Override // DatanodeProtocol
+  public void commitBlockSynchronization(ExtendedBlock block,
+      long newgenerationstamp, long newlength,
+      boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
+      throws IOException {
+    namesystem.commitBlockSynchronization(block,
+        newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
+  }
+  
+  @Override // ClientProtocol
+  public long getPreferredBlockSize(String filename) 
+      throws IOException {
+    return namesystem.getPreferredBlockSize(filename);
+  }
+    
+  @Deprecated
+  @Override // ClientProtocol
+  public boolean rename(String src, String dst) throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
+    }
+    if (!checkPathLength(dst)) {
+      throw new IOException("rename: Pathname too long.  Limit "
+          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    boolean ret = namesystem.renameTo(src, dst);
+    if (ret) {
+      metrics.incrFilesRenamed();
+    }
+    return ret;
+  }
+  
+  @Override // ClientProtocol
+  public void concat(String trg, String[] src) throws IOException {
+    namesystem.concat(trg, src);
+  }
+  
+  @Override // ClientProtocol
+  public void rename(String src, String dst, Options.Rename... options)
+      throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
+    }
+    if (!checkPathLength(dst)) {
+      throw new IOException("rename: Pathname too long.  Limit "
+          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    namesystem.renameTo(src, dst, options);
+    metrics.incrFilesRenamed();
+  }
+
+  @Deprecated
+  @Override // ClientProtocol
+  public boolean delete(String src) throws IOException {
+    return delete(src, true);
+  }
+
+  @Override // ClientProtocol
+  public boolean delete(String src, boolean recursive) throws IOException {
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+          + ", recursive=" + recursive);
+    }
+    boolean ret = namesystem.delete(src, recursive);
+    if (ret) 
+      metrics.incrDeleteFileOps();
+    return ret;
+  }
+
+  /**
+   * Check path length does not exceed maximum.  Returns true if
+   * length and depth are okay.  Returns false if length is too long 
+   * or depth is too great.
+   */
+  private boolean checkPathLength(String src) {
+    Path srcPath = new Path(src);
+    return (src.length() <= MAX_PATH_LENGTH &&
+            srcPath.depth() <= MAX_PATH_DEPTH);
+  }
+    
+  @Override // ClientProtocol
+  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
+      throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
+    }
+    if (!checkPathLength(src)) {
+      throw new IOException("mkdirs: Pathname too long.  Limit " 
+                            + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    return namesystem.mkdirs(src,
+        new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
+            null, masked), createParent);
+  }
+
+  @Override // ClientProtocol
+  public void renewLease(String clientName) throws IOException {
+    namesystem.renewLease(clientName);        
+  }
+
+  @Override // ClientProtocol
+  public DirectoryListing getListing(String src, byte[] startAfter,
+      boolean needLocation)
+  throws IOException {
+    DirectoryListing files = namesystem.getListing(
+        src, startAfter, needLocation);
+    if (files != null) {
+      metrics.incrGetListingOps();
+      metrics.incrFilesInGetListingOps(files.getPartialListing().length);
+    }
+    return files;
+  }
+
+  @Override // ClientProtocol
+  public HdfsFileStatus getFileInfo(String src)  throws IOException {
+    metrics.incrFileInfoOps();
+    return namesystem.getFileInfo(src, true);
+  }
+
+  @Override // ClientProtocol
+  public HdfsFileStatus getFileLinkInfo(String src) throws IOException { 
+    metrics.incrFileInfoOps();
+    return namesystem.getFileInfo(src, false);
+  }
+  
+  @Override
+  public long[] getStats() {
+    return namesystem.getStats();
+  }
+
+  @Override // ClientProtocol
+  public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
+      throws IOException {
+    DatanodeInfo results[] = namesystem.datanodeReport(type);
+    if (results == null ) {
+      throw new IOException("Cannot find datanode report");
+    }
+    return results;
+  }
+    
+  @Override // ClientProtocol
+  public boolean setSafeMode(SafeModeAction action) throws IOException {
+    return namesystem.setSafeMode(action);
+  }
+
+  @Override // ClientProtocol
+  public boolean restoreFailedStorage(String arg) 
+      throws AccessControlException {
+    return namesystem.restoreFailedStorage(arg);
+  }
+
+  @Override // ClientProtocol
+  public void saveNamespace() throws IOException {
+    namesystem.saveNamespace();
+  }
+
+  @Override // ClientProtocol
+  public void refreshNodes() throws IOException {
+    namesystem.getBlockManager().getDatanodeManager().refreshNodes(
+        new HdfsConfiguration());
+  }
+
+  @Override // NamenodeProtocol
+  public long getTransactionID() {
+    return namesystem.getEditLog().getSyncTxId();
+  }
+
+  @Override // NamenodeProtocol
+  public CheckpointSignature rollEditLog() throws IOException {
+    return namesystem.rollEditLog();
+  }
+  
+  @Override
+  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+  throws IOException {
+    return namesystem.getEditLog().getEditLogManifest(sinceTxId);
+  }
+    
+  @Override // ClientProtocol
+  public void finalizeUpgrade() throws IOException {
+    namesystem.finalizeUpgrade();
+  }
+
+  @Override // ClientProtocol
+  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
+      throws IOException {
+    return namesystem.distributedUpgradeProgress(action);
+  }
+
+  @Override // ClientProtocol
+  public void metaSave(String filename) throws IOException {
+    namesystem.metaSave(filename);
+  }
+
+  @Override // ClientProtocol
+  public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
+      throws IOException {
+    Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
+      namesystem.listCorruptFileBlocks(path, cookie);
+    
+    String[] files = new String[fbs.size()];
+    String lastCookie = "";
+    int i = 0;
+    for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
+      files[i++] = fb.path;
+      lastCookie = fb.block.getBlockName();
+    }
+    return new CorruptFileBlocks(files, lastCookie);
+  }
+
+  /**
+   * Tell all datanodes to use a new, non-persistent bandwidth value for
+   * dfs.datanode.balance.bandwidthPerSec.
+   * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+   * @throws IOException
+   */
+  @Override // ClientProtocol
+  public void setBalancerBandwidth(long bandwidth) throws IOException {
+    namesystem.getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
+  }
+  
+  @Override // ClientProtocol
+  public ContentSummary getContentSummary(String path) throws IOException {
+    return namesystem.getContentSummary(path);
+  }
+
+  @Override // ClientProtocol
+  public void setQuota(String path, long namespaceQuota, long diskspaceQuota) 
+      throws IOException {
+    namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
+  }
+  
+  @Override // ClientProtocol
+  public void fsync(String src, String clientName) throws IOException {
+    namesystem.fsync(src, clientName);
+  }
+
+  @Override // ClientProtocol
+  public void setTimes(String src, long mtime, long atime) 
+      throws IOException {
+    namesystem.setTimes(src, mtime, atime);
+  }
+
+  @Override // ClientProtocol
+  public void createSymlink(String target, String link, FsPermission dirPerms,
+      boolean createParent) throws IOException {
+    metrics.incrCreateSymlinkOps();
+    /* We enforce the MAX_PATH_LENGTH limit even though a symlink target 
+     * URI may refer to a non-HDFS file system. 
+     */
+    if (!checkPathLength(link)) {
+      throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH +
+                            " character limit");
+                            
+    }
+    if ("".equals(target)) {
+      throw new IOException("Invalid symlink target");
+    }
+    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    namesystem.createSymlink(target, link,
+      new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
+  }
+
+  @Override // ClientProtocol
+  public String getLinkTarget(String path) throws IOException {
+    metrics.incrGetLinkTargetOps();
+    /* Resolves the first symlink in the given path, returning a
+     * new path consisting of the target of the symlink and any 
+     * remaining path components from the original path.
+     */
+    try {
+      HdfsFileStatus stat = namesystem.getFileInfo(path, false);
+      if (stat != null) {
+        // NB: getSymlink throws IOException if !stat.isSymlink() 
+        return stat.getSymlink();
+      }
+    } catch (UnresolvedPathException e) {
+      return e.getResolvedPath().toString();
+    } catch (UnresolvedLinkException e) {
+      // The NameNode should only throw an UnresolvedPathException
+      throw new AssertionError("UnresolvedLinkException thrown");
+    }
+    return null;
+  }
+
+
+  @Override // DatanodeProtocol
+  public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
+      throws IOException {
+    verifyVersion(nodeReg.getVersion());
+    namesystem.registerDatanode(nodeReg);
+      
+    return nodeReg;
+  }
+
+  @Override // DatanodeProtocol
+  public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
+      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+      int xmitsInProgress, int xceiverCount, int failedVolumes)
+      throws IOException {
+    verifyRequest(nodeReg);
+    return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
+        blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes);
+  }
+
+  @Override // DatanodeProtocol
+  public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
+      String poolId, long[] blocks) throws IOException {
+    verifyRequest(nodeReg);
+    BlockListAsLongs blist = new BlockListAsLongs(blocks);
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+           + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
+           + " blocks");
+    }
+
+    namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
+    if (nn.getFSImage().isUpgradeFinalized())
+      return new DatanodeCommand.Finalize(poolId);
+    return null;
+  }
+
+  @Override // DatanodeProtocol
+  public void blockReceived(DatanodeRegistration nodeReg, String poolId,
+      Block blocks[], String delHints[]) throws IOException {
+    verifyRequest(nodeReg);
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*BLOCK* NameNode.blockReceived: "
+          +"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
+    }
+    for (int i = 0; i < blocks.length; i++) {
+      namesystem.getBlockManager().blockReceived(
+          nodeReg, poolId, blocks[i], delHints[i]);
+    }
+  }
+  
+  @Override // DatanodeProtocol
+  public void errorReport(DatanodeRegistration nodeReg,
+                          int errorCode, String msg) throws IOException { 
+    String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName());
+
+    if (errorCode == DatanodeProtocol.NOTIFY) {
+      LOG.info("Error report from " + dnName + ": " + msg);
+      return;
+    }
+    verifyRequest(nodeReg);
+
+    if (errorCode == DatanodeProtocol.DISK_ERROR) {
+      LOG.warn("Disk error on " + dnName + ": " + msg);
+    } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
+      LOG.warn("Fatal disk error on " + dnName + ": " + msg);
+      namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg);            
+    } else {
+      LOG.info("Error report from " + dnName + ": " + msg);
+    }
+  }
+    
+  @Override // DatanodeProtocol, NamenodeProtocol
+  public NamespaceInfo versionRequest() throws IOException {
+    return namesystem.getNamespaceInfo();
+  }
+
+  @Override // DatanodeProtocol
+  public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
+    return namesystem.processDistributedUpgradeCommand(comm);
+  }
+
+  /** 
+   * Verify request.
+   * 
+   * Verifies correctness of the datanode version, registration ID, and 
+   * if the datanode does not need to be shutdown.
+   * 
+   * @param nodeReg data node registration
+   * @throws IOException
+   */
+  void verifyRequest(NodeRegistration nodeReg) throws IOException {
+    verifyVersion(nodeReg.getVersion());
+    if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID())) {
+      LOG.warn("Invalid registrationID - expected: "
+          + namesystem.getRegistrationID() + " received: "
+          + nodeReg.getRegistrationID());
+      throw new UnregisteredNodeException(nodeReg);
+    }
+  }
+    
+
+  @Override // RefreshAuthorizationPolicyProtocol
+  public void refreshServiceAcl() throws IOException {
+    if (!serviceAuthEnabled) {
+      throw new AuthorizationException("Service Level Authorization not enabled!");
+    }
+
+    this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
+    if (this.serviceRpcServer != null) {
+      this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
+    }
+  }
+
+  @Override // RefreshAuthorizationPolicyProtocol
+  public void refreshUserToGroupsMappings() throws IOException {
+    LOG.info("Refreshing all user-to-groups mappings. Requested by user: " + 
+             UserGroupInformation.getCurrentUser().getShortUserName());
+    Groups.getUserToGroupsMappingService().refresh();
+  }
+
+  @Override // RefreshAuthorizationPolicyProtocol
+  public void refreshSuperUserGroupsConfiguration() {
+    LOG.info("Refreshing SuperUser proxy group mapping list ");
+
+    ProxyUsers.refreshSuperUserGroupsConfiguration();
+  }
+  
+  @Override // GetUserMappingsProtocol
+  public String[] getGroupsForUser(String user) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Getting groups for user " + user);
+    }
+    return UserGroupInformation.createRemoteUser(user).getGroupNames();
+  }
+
+
+  /**
+   * Verify version.
+   * 
+   * @param version
+   * @throws IOException
+   */
+  void verifyVersion(int version) throws IOException {
+    if (version != HdfsConstants.LAYOUT_VERSION)
+      throw new IncorrectVersionException(version, "data node");
+  }
+
+  private static String getClientMachine() {
+    String clientMachine = Server.getRemoteAddress();
+    if (clientMachine == null) {
+      clientMachine = "";
+    }
+    return clientMachine;
+  }
+}

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Mon Sep 12 18:59:42 2011
@@ -173,7 +173,7 @@ public class NamenodeFsck {
       out.println(msg);
       namenode.getNamesystem().logFsckEvent(path, remoteAddress);
 
-      final HdfsFileStatus file = namenode.getFileInfo(path);
+      final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(path);
       if (file != null) {
 
         if (showCorruptFileBlocks) {
@@ -250,7 +250,8 @@ public class NamenodeFsck {
       res.totalDirs++;
       do {
         assert lastReturnedName != null;
-        thisListing = namenode.getListing(path, lastReturnedName, false);
+        thisListing = namenode.getRpcServer().getListing(
+            path, lastReturnedName, false);
         if (thisListing == null) {
           return;
         }
@@ -385,7 +386,7 @@ public class NamenodeFsck {
         break;
       case FIXING_DELETE:
         if (!isOpen)
-          namenode.delete(path, true);
+          namenode.getRpcServer().delete(path, true);
       }
     }
     if (showFiles) {
@@ -414,7 +415,8 @@ public class NamenodeFsck {
     String target = lostFound + fullName;
     String errmsg = "Failed to move " + fullName + " to /lost+found";
     try {
-      if (!namenode.mkdirs(target, file.getPermission(), true)) {
+      if (!namenode.getRpcServer().mkdirs(
+          target, file.getPermission(), true)) {
         LOG.warn(errmsg);
         return;
       }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Mon Sep 12 18:59:42 2011
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -354,7 +355,7 @@ class NamenodeJspHelper {
     }
   }
 
-  static String getDelegationToken(final NameNode nn,
+  static String getDelegationToken(final NamenodeProtocols nn,
       HttpServletRequest request, Configuration conf,
       final UserGroupInformation ugi) throws IOException, InterruptedException {
     Token<DelegationTokenIdentifier> token = ugi
@@ -381,7 +382,8 @@ class NamenodeJspHelper {
         .getAttribute(JspHelper.CURRENT_CONF);
     final DatanodeID datanode = getRandomDatanode(nn);
     UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
-    String tokenString = getDelegationToken(nn, request, conf, ugi);
+    String tokenString = getDelegationToken(
+        nn.getRpcServer(), request, conf, ugi);
     // if the user is defined, get a delegation token and stringify it
     final String redirectLocation;
     final String nodeToRedirect;

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java Mon Sep 12 18:59:42 2011
@@ -70,7 +70,7 @@ public class RenewDelegationTokenServlet
     try {
       long result = ugi.doAs(new PrivilegedExceptionAction<Long>() {
         public Long run() throws Exception {
-          return nn.renewDelegationToken(token);
+          return nn.getRpcServer().renewDelegationToken(token);
         }
       });
       PrintStream os = new PrintStream(resp.getOutputStream());

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Mon Sep 12 18:59:42 2011
@@ -1026,6 +1026,14 @@ public class MiniDFSCluster {
   }
   
   /**
+   * Get an instance of the NameNode's RPC handler.
+   */
+  public NamenodeProtocols getNameNodeRpc() {
+    checkSingleNameNode();
+    return getNameNode(0).getRpcServer();
+  }
+  
+  /**
    * Gets the NameNode for the index.  May be null.
    */
   public NameNode getNameNode(int nnIndex) {
@@ -1361,7 +1369,15 @@ public class MiniDFSCluster {
     if (nameNode == null) {
       return false;
     }
-    long[] sizes = nameNode.getStats();
+    long[] sizes;
+    try {
+      sizes = nameNode.getRpcServer().getStats();
+    } catch (IOException ioe) {
+      // This method above should never throw.
+      // It only throws IOE since it is exposed via RPC
+      throw new AssertionError("Unexpected IOE thrown: "
+          + StringUtils.stringifyException(ioe));
+    }
     boolean isUp = false;
     synchronized (this) {
       isUp = ((!nameNode.isInSafeMode() || !waitSafeMode) && sizes[0] != 0);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java Mon Sep 12 18:59:42 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.IOUtils;
 
@@ -45,7 +46,7 @@ public class TestClientProtocolForPipeli
     try {
       cluster.waitActive();
       FileSystem fileSys = cluster.getFileSystem();
-      NameNode namenode = cluster.getNameNode();
+      NamenodeProtocols namenode = cluster.getNameNodeRpc();
 
       /* Test writing to finalized replicas */
       Path file = new Path("dataprotocol.dat");    

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java Mon Sep 12 18:59:42 2011
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Client;
@@ -190,7 +191,7 @@ public class TestDFSClientRetries extend
     final int maxRetries = 1; // Allow one retry (total of two calls)
     conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries);
     
-    NameNode mockNN = mock(NameNode.class);
+    NamenodeProtocols mockNN = mock(NamenodeProtocols.class);
     Answer<Object> answer = new ThrowsException(new IOException()) {
       int retryCount = 0;
       
@@ -240,8 +241,8 @@ public class TestDFSClientRetries extend
     try {
       cluster.waitActive();
       FileSystem fs = cluster.getFileSystem();
-      NameNode preSpyNN = cluster.getNameNode();
-      NameNode spyNN = spy(preSpyNN);
+      NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
+      NamenodeProtocols spyNN = spy(preSpyNN);
       DFSClient client = new DFSClient(null, spyNN, conf, null);
       int maxBlockAcquires = client.getMaxBlockAcquireFailures();
       assertTrue(maxBlockAcquires > 0);
@@ -305,11 +306,11 @@ public class TestDFSClientRetries extend
    */
   private static class FailNTimesAnswer implements Answer<LocatedBlocks> {
     private int failuresLeft;
-    private NameNode realNN;
+    private NamenodeProtocols realNN;
 
-    public FailNTimesAnswer(NameNode realNN, int timesToFail) {
+    public FailNTimesAnswer(NamenodeProtocols preSpyNN, int timesToFail) {
       failuresLeft = timesToFail;
-      this.realNN = realNN;
+      this.realNN = preSpyNN;
     }
 
     public LocatedBlocks answer(InvocationOnMock invocation) throws IOException {
@@ -603,7 +604,8 @@ public class TestDFSClientRetries extend
 
       //stop the first datanode
       final List<LocatedBlock> locatedblocks = DFSClient.callGetBlockLocations(
-          cluster.getNameNode(), f, 0, Long.MAX_VALUE).getLocatedBlocks();
+          cluster.getNameNodeRpc(), f, 0, Long.MAX_VALUE)
+            .getLocatedBlocks();
       final DatanodeInfo first = locatedblocks.get(0).getLocations()[0];
       cluster.stopDataNode(first.getName());
 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java Mon Sep 12 18:59:42 2011
@@ -293,10 +293,11 @@ public class TestDecommission {
   }
   
   private void verifyStats(NameNode namenode, FSNamesystem fsn,
-      DatanodeInfo node, boolean decommissioning) throws InterruptedException {
+      DatanodeInfo node, boolean decommissioning)
+      throws InterruptedException, IOException {
     // Do the stats check over 10 iterations
     for (int i = 0; i < 10; i++) {
-      long[] newStats = namenode.getStats();
+      long[] newStats = namenode.getRpcServer().getStats();
 
       // For decommissioning nodes, ensure capacity of the DN is no longer
       // counted. Only used space of the DN is counted in cluster capacity

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java Mon Sep 12 18:59:42 2011
@@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -44,6 +43,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.Before;
@@ -151,8 +151,8 @@ public class TestFileAppend4 {
  
     try {
       cluster.waitActive();
-      NameNode preSpyNN = cluster.getNameNode();
-      NameNode spyNN = spy(preSpyNN);
+      NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
+      NamenodeProtocols spyNN = spy(preSpyNN);
  
       // Delay completeFile
       GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
@@ -222,8 +222,8 @@ public class TestFileAppend4 {
  
     try {
       cluster.waitActive();
-      NameNode preSpyNN = cluster.getNameNode();
-      NameNode spyNN = spy(preSpyNN);
+      NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
+      NamenodeProtocols spyNN = spy(preSpyNN);
  
       // Delay completeFile
       GenericTestUtils.DelayAnswer delayer =

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java Mon Sep 12 18:59:42 2011
@@ -420,7 +420,7 @@ public class TestFileCreation extends ju
       final Path f = new Path("/foo.txt");
       createFile(dfs, f, 3);
       try {
-        cluster.getNameNode().addBlock(f.toString(), 
+        cluster.getNameNodeRpc().addBlock(f.toString(), 
             client.clientName, null, null);
         fail();
       } catch(IOException ioe) {

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java Mon Sep 12 18:59:42 2011
@@ -106,7 +106,7 @@ public class TestLeaseRecovery extends j
 
 
       DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
-      cluster.getNameNode().append(filestr, dfs.dfs.clientName);
+      cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName);
 
       // expire lease to trigger block recovery.
       waitLeaseRecovery(cluster);
@@ -129,14 +129,14 @@ public class TestLeaseRecovery extends j
       filestr = "/foo.safemode";
       filepath = new Path(filestr);
       dfs.create(filepath, (short)1);
-      cluster.getNameNode().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
+      cluster.getNameNodeRpc().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
       assertTrue(dfs.dfs.exists(filestr));
       DFSTestUtil.waitReplication(dfs, filepath, (short)1);
       waitLeaseRecovery(cluster);
       // verify that we still cannot recover the lease
       LeaseManager lm = NameNodeAdapter.getLeaseManager(cluster.getNamesystem());
       assertTrue("Found " + lm.countLease() + " lease, expected 1", lm.countLease() == 1);
-      cluster.getNameNode().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+      cluster.getNameNodeRpc().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
     }
     finally {
       if (cluster != null) {cluster.shutdown();}

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java Mon Sep 12 18:59:42 2011
@@ -100,7 +100,7 @@ public class TestPipelines {
     ofs.writeBytes("Some more stuff to write");
     ((DFSOutputStream) ofs.getWrappedStream()).hflush();
 
-    List<LocatedBlock> lb = cluster.getNameNode().getBlockLocations(
+    List<LocatedBlock> lb = cluster.getNameNodeRpc().getBlockLocations(
       filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks();
 
     String bpid = cluster.getNamesystem().getBlockPoolId();

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java Mon Sep 12 18:59:42 2011
@@ -51,7 +51,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 
 /**
  * This class defines a number of static helper methods used by the
@@ -121,7 +121,7 @@ public class UpgradeUtilities {
                                    .manageNameDfsDirs(false)
                                    .build();
         
-      NameNode namenode = cluster.getNameNode();
+      NamenodeProtocols namenode = cluster.getNameNodeRpc();
       namenodeStorageNamespaceID = namenode.versionRequest().getNamespaceID();
       namenodeStorageFsscTime = namenode.versionRequest().getCTime();
       namenodeStorageClusterID = namenode.versionRequest().getClusterID();
@@ -517,7 +517,7 @@ public class UpgradeUtilities {
    */
   public static int getCurrentNamespaceID(MiniDFSCluster cluster) throws IOException {
     if (cluster != null) {
-      return cluster.getNameNode().versionRequest().getNamespaceID();
+      return cluster.getNameNodeRpc().versionRequest().getNamespaceID();
     }
     return namenodeStorageNamespaceID;
   }
@@ -528,7 +528,7 @@ public class UpgradeUtilities {
    */
   public static String getCurrentClusterID(MiniDFSCluster cluster) throws IOException {
     if (cluster != null) {
-      return cluster.getNameNode().versionRequest().getClusterID();
+      return cluster.getNameNodeRpc().versionRequest().getClusterID();
     }
     return namenodeStorageClusterID;
   }
@@ -539,7 +539,7 @@ public class UpgradeUtilities {
    */
   public static String getCurrentBlockPoolID(MiniDFSCluster cluster) throws IOException {
     if (cluster != null) {
-      return cluster.getNameNode().versionRequest().getBlockPoolID();
+      return cluster.getNameNodeRpc().versionRequest().getBlockPoolID();
     }
     return namenodeStorageBlockPoolID;
   }
@@ -554,7 +554,7 @@ public class UpgradeUtilities {
    */
   public static long getCurrentFsscTime(MiniDFSCluster cluster) throws IOException {
     if (cluster != null) {
-      return cluster.getNameNode().versionRequest().getCTime();
+      return cluster.getNameNodeRpc().versionRequest().getCTime();
     }
     return namenodeStorageFsscTime;
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java Mon Sep 12 18:59:42 2011
@@ -375,11 +375,11 @@ public class TestBlockToken {
       Path filePath = new Path(fileName);
       FSDataOutputStream out = fs.create(filePath, (short) 1);
       out.write(new byte[1000]);
-      LocatedBlocks locatedBlocks = cluster.getNameNode().getBlockLocations(
+      LocatedBlocks locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(
           fileName, 0, 1000);
       while (locatedBlocks.getLastLocatedBlock() == null) {
         Thread.sleep(100);
-        locatedBlocks = cluster.getNameNode().getBlockLocations(fileName, 0,
+        locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(fileName, 0,
             1000);
       }
       Token<BlockTokenIdentifier> token = locatedBlocks.getLastLocatedBlock()

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java Mon Sep 12 18:59:42 2011
@@ -89,7 +89,7 @@ public class TestBalancerWithMultipleNam
       this.cluster = cluster;
       clients = new ClientProtocol[nNameNodes];
       for(int i = 0; i < nNameNodes; i++) {
-        clients[i] = cluster.getNameNode(i);
+        clients[i] = cluster.getNameNode(i).getRpcServer();
       }
       replication = (short)Math.max(1, nDataNodes - 1);
     }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Mon Sep 12 18:59:42 2011
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
@@ -314,6 +315,7 @@ public class TestBlockTokenWithDFS {
       assertEquals(numDataNodes, cluster.getDataNodes().size());
 
       final NameNode nn = cluster.getNameNode();
+      final NamenodeProtocols nnProto = nn.getRpcServer();
       final BlockManager bm = nn.getNamesystem().getBlockManager();
       final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
 
@@ -344,7 +346,7 @@ public class TestBlockTokenWithDFS {
 
       new DFSClient(new InetSocketAddress("localhost",
           cluster.getNameNodePort()), conf);
-      List<LocatedBlock> locatedBlocks = nn.getBlockLocations(
+      List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
           FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
       LocatedBlock lblock = locatedBlocks.get(0); // first block
       Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Mon Sep 12 18:59:42 2011
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -139,7 +138,7 @@ public class TestBlockReport {
     DataNode dn = cluster.getDataNodes().get(DN_N0);
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-    cluster.getNameNode().blockReport(dnR, poolId,
+    cluster.getNameNodeRpc().blockReport(dnR, poolId,
       new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     List<LocatedBlock> blocksAfterReport =
@@ -181,9 +180,10 @@ public class TestBlockReport {
 
     List<ExtendedBlock> blocks2Remove = new ArrayList<ExtendedBlock>();
     List<Integer> removedIndex = new ArrayList<Integer>();
-    List<LocatedBlock> lBlocks = cluster.getNameNode().getBlockLocations(
-      filePath.toString(), FILE_START,
-      FILE_SIZE).getLocatedBlocks();
+    List<LocatedBlock> lBlocks =
+      cluster.getNameNodeRpc().getBlockLocations(
+          filePath.toString(), FILE_START,
+          FILE_SIZE).getLocatedBlocks();
 
     while (removedIndex.size() != 2) {
       int newRemoveIndex = rand.nextInt(lBlocks.size());
@@ -218,7 +218,7 @@ public class TestBlockReport {
     DataNode dn = cluster.getDataNodes().get(DN_N0);
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-    cluster.getNameNode().blockReport(dnR, poolId,
+    cluster.getNameNodeRpc().blockReport(dnR, poolId,
       new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
@@ -258,7 +258,8 @@ public class TestBlockReport {
     DataNode dn = cluster.getDataNodes().get(DN_N0);
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-    DatanodeCommand dnCmd = cluster.getNameNode().blockReport(dnR, poolId,
+    DatanodeCommand dnCmd =
+      cluster.getNameNodeRpc().blockReport(dnR, poolId,
         new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     if(LOG.isDebugEnabled()) {
       LOG.debug("Got the command: " + dnCmd);
@@ -310,7 +311,7 @@ public class TestBlockReport {
     DataNode dn = cluster.getDataNodes().get(DN_N1);
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-    cluster.getNameNode().blockReport(dnR, poolId,
+    cluster.getNameNodeRpc().blockReport(dnR, poolId,
         new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
     assertEquals("Wrong number of PendingReplication Blocks",
@@ -359,7 +360,7 @@ public class TestBlockReport {
     DataNode dn = cluster.getDataNodes().get(DN_N1);
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-    cluster.getNameNode().blockReport(dnR, poolId,
+    cluster.getNameNodeRpc().blockReport(dnR, poolId,
         new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
     assertEquals("Wrong number of Corrupted blocks",
@@ -381,7 +382,7 @@ public class TestBlockReport {
       LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
     }
     
-    cluster.getNameNode().blockReport(dnR, poolId,
+    cluster.getNameNodeRpc().blockReport(dnR, poolId,
         new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
 
@@ -431,7 +432,7 @@ public class TestBlockReport {
       DataNode dn = cluster.getDataNodes().get(DN_N1);
       String poolId = cluster.getNamesystem().getBlockPoolId();
       DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-      cluster.getNameNode().blockReport(dnR, poolId,
+      cluster.getNameNodeRpc().blockReport(dnR, poolId,
           new BlockListAsLongs(blocks, null).getBlockListAsLongs());
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
@@ -477,7 +478,7 @@ public class TestBlockReport {
       DataNode dn = cluster.getDataNodes().get(DN_N1);
       String poolId = cluster.getNamesystem().getBlockPoolId();
       DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-      cluster.getNameNode().blockReport(dnR, poolId,
+      cluster.getNameNodeRpc().blockReport(dnR, poolId,
           new BlockListAsLongs(blocks, null).getBlockListAsLongs());
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
@@ -590,7 +591,7 @@ public class TestBlockReport {
     DFSTestUtil.createFile(fs, filePath, fileSize,
       REPL_FACTOR, rand.nextLong());
 
-    return locatedToBlocks(cluster.getNameNode()
+    return locatedToBlocks(cluster.getNameNodeRpc()
       .getBlockLocations(filePath.toString(), FILE_START,
         fileSize).getLocatedBlocks(), null);
   }
@@ -707,7 +708,8 @@ public class TestBlockReport {
   private Block findBlock(Path path, long size) throws IOException {
     Block ret;
       List<LocatedBlock> lbs =
-        cluster.getNameNode().getBlockLocations(path.toString(),
+        cluster.getNameNodeRpc()
+        .getBlockLocations(path.toString(),
           FILE_START, size).getLocatedBlocks();
       LocatedBlock lb = lbs.get(lbs.size() - 1);
 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Mon Sep 12 18:59:42 2011
@@ -40,8 +40,8 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.net.NetUtils;
 
@@ -144,7 +144,7 @@ public class TestDataNodeVolumeFailure {
     String bpid = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
     long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs();
-    cluster.getNameNode().blockReport(dnR, bpid, bReport);
+    cluster.getNameNodeRpc().blockReport(dnR, bpid, bReport);
 
     // verify number of blocks and files...
     verify(filename, filesize);
@@ -216,7 +216,7 @@ public class TestDataNodeVolumeFailure {
    * @throws IOException
    */
   private void triggerFailure(String path, long size) throws IOException {
-    NameNode nn = cluster.getNameNode();
+    NamenodeProtocols nn = cluster.getNameNodeRpc();
     List<LocatedBlock> locatedBlocks =
       nn.getBlockLocations(path, 0, size).getLocatedBlocks();
     
@@ -291,7 +291,7 @@ public class TestDataNodeVolumeFailure {
     throws IOException {
     int total = 0;
     
-    NameNode nn = cluster.getNameNode();
+    NamenodeProtocols nn = cluster.getNameNodeRpc();
     List<LocatedBlock> locatedBlocks = 
       nn.getBlockLocations(path, 0, size).getLocatedBlocks();
     //System.out.println("Number of blocks: " + locatedBlocks.size()); 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java Mon Sep 12 18:59:42 2011
@@ -109,7 +109,7 @@ public class TestTransferRbw {
         
         final DatanodeInfo oldnodeinfo;
         {
-          final DatanodeInfo[] datatnodeinfos = cluster.getNameNode(
+          final DatanodeInfo[] datatnodeinfos = cluster.getNameNodeRpc(
               ).getDatanodeReport(DatanodeReportType.LIVE);
           Assert.assertEquals(2, datatnodeinfos.length);
           int i = 0;

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Mon Sep 12 18:59:42 2011
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -100,6 +101,7 @@ public class NNThroughputBenchmark {
 
   static Configuration config;
   static NameNode nameNode;
+  static NamenodeProtocols nameNodeProto;
 
   NNThroughputBenchmark(Configuration conf) throws IOException, LoginException {
     config = conf;
@@ -119,6 +121,7 @@ public class NNThroughputBenchmark {
     // Start the NameNode
     String[] argv = new String[] {};
     nameNode = NameNode.createNameNode(argv, config);
+    nameNodeProto = nameNode.getRpcServer();
   }
 
   void close() throws IOException {
@@ -264,9 +267,9 @@ public class NNThroughputBenchmark {
     }
 
     void cleanUp() throws IOException {
-      nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
       if(!keepResults)
-        nameNode.delete(getBaseDir(), true);
+        nameNodeProto.delete(getBaseDir(), true);
     }
 
     int getNumOpsExecuted() {
@@ -397,7 +400,7 @@ public class NNThroughputBenchmark {
     void benchmarkOne() throws IOException {
       for(int idx = 0; idx < opsPerThread; idx++) {
         if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0)
-          nameNode.refreshUserToGroupsMappings();
+          nameNodeProto.refreshUserToGroupsMappings();
         long stat = statsOp.executeOp(daemonId, idx, arg1);
         localNumOpsExecuted++;
         localCumulativeTime += stat;
@@ -458,9 +461,9 @@ public class NNThroughputBenchmark {
      */
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
-      nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
       long start = System.currentTimeMillis();
-      nameNode.delete(BASE_DIR_NAME, true);
+      nameNodeProto.delete(BASE_DIR_NAME, true);
       long end = System.currentTimeMillis();
       return end-start;
     }
@@ -522,7 +525,7 @@ public class NNThroughputBenchmark {
 
     void generateInputs(int[] opsPerThread) throws IOException {
       assert opsPerThread.length == numThreads : "Error opsPerThread.length"; 
-      nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
       // int generatedFileIdx = 0;
       LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
       fileNames = new String[numThreads][];
@@ -554,12 +557,12 @@ public class NNThroughputBenchmark {
     throws IOException {
       long start = System.currentTimeMillis();
       // dummyActionNoSynch(fileIdx);
-      nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
+      nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
                       clientName, new EnumSetWritable<CreateFlag>(EnumSet
               .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
       long end = System.currentTimeMillis();
       for(boolean written = !closeUponCreate; !written; 
-        written = nameNode.complete(fileNames[daemonId][inputIdx],
+        written = nameNodeProto.complete(fileNames[daemonId][inputIdx],
                                     clientName, null));
       return end-start;
     }
@@ -627,11 +630,11 @@ public class NNThroughputBenchmark {
       }
       // use the same files for open
       super.generateInputs(opsPerThread);
-      if(nameNode.getFileInfo(opCreate.getBaseDir()) != null
-          && nameNode.getFileInfo(getBaseDir()) == null) {
-        nameNode.rename(opCreate.getBaseDir(), getBaseDir());
+      if(nameNodeProto.getFileInfo(opCreate.getBaseDir()) != null
+          && nameNodeProto.getFileInfo(getBaseDir()) == null) {
+        nameNodeProto.rename(opCreate.getBaseDir(), getBaseDir());
       }
-      if(nameNode.getFileInfo(getBaseDir()) == null) {
+      if(nameNodeProto.getFileInfo(getBaseDir()) == null) {
         throw new IOException(getBaseDir() + " does not exist.");
       }
     }
@@ -642,7 +645,7 @@ public class NNThroughputBenchmark {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = System.currentTimeMillis();
-      nameNode.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
+      nameNodeProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
       long end = System.currentTimeMillis();
       return end-start;
     }
@@ -670,7 +673,7 @@ public class NNThroughputBenchmark {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = System.currentTimeMillis();
-      nameNode.delete(fileNames[daemonId][inputIdx], false);
+      nameNodeProto.delete(fileNames[daemonId][inputIdx], false);
       long end = System.currentTimeMillis();
       return end-start;
     }
@@ -698,7 +701,7 @@ public class NNThroughputBenchmark {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = System.currentTimeMillis();
-      nameNode.getFileInfo(fileNames[daemonId][inputIdx]);
+      nameNodeProto.getFileInfo(fileNames[daemonId][inputIdx]);
       long end = System.currentTimeMillis();
       return end-start;
     }
@@ -740,7 +743,7 @@ public class NNThroughputBenchmark {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = System.currentTimeMillis();
-      nameNode.rename(fileNames[daemonId][inputIdx],
+      nameNodeProto.rename(fileNames[daemonId][inputIdx],
                       destNames[daemonId][inputIdx]);
       long end = System.currentTimeMillis();
       return end-start;
@@ -787,11 +790,11 @@ public class NNThroughputBenchmark {
 
     void register() throws IOException {
       // get versions from the namenode
-      nsInfo = nameNode.versionRequest();
+      nsInfo = nameNodeProto.versionRequest();
       dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
       DataNode.setNewStorageID(dnRegistration);
       // register datanode
-      dnRegistration = nameNode.registerDatanode(dnRegistration);
+      dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
     }
 
     /**
@@ -801,7 +804,7 @@ public class NNThroughputBenchmark {
     void sendHeartbeat() throws IOException {
       // register datanode
       // TODO:FEDERATION currently a single block pool is supported
-      DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration,
+      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0);
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
@@ -846,7 +849,7 @@ public class NNThroughputBenchmark {
     int replicateBlocks() throws IOException {
       // register datanode
       // TODO:FEDERATION currently a single block pool is supported
-      DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration,
+      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0);
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
@@ -877,7 +880,7 @@ public class NNThroughputBenchmark {
           receivedDNReg.setStorageInfo(
                           new DataStorage(nsInfo, dnInfo.getStorageID()));
           receivedDNReg.setInfoPort(dnInfo.getInfoPort());
-          nameNode.blockReceived( receivedDNReg, 
+          nameNodeProto.blockReceived( receivedDNReg, 
                                   nameNode.getNamesystem().getBlockPoolId(),
                                   new Block[] {blocks[i]},
                                   new String[] {DataNode.EMPTY_DEL_HINT});
@@ -968,14 +971,14 @@ public class NNThroughputBenchmark {
       FileNameGenerator nameGenerator;
       nameGenerator = new FileNameGenerator(getBaseDir(), 100);
       String clientName = getClientName(007);
-      nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
       for(int idx=0; idx < nrFiles; idx++) {
         String fileName = nameGenerator.getNextFileName("ThroughputBench");
-        nameNode.create(fileName, FsPermission.getDefault(), clientName,
+        nameNodeProto.create(fileName, FsPermission.getDefault(), clientName,
             new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
             BLOCK_SIZE);
         ExtendedBlock lastBlock = addBlocks(fileName, clientName);
-        nameNode.complete(fileName, clientName, lastBlock);
+        nameNodeProto.complete(fileName, clientName, lastBlock);
       }
       // prepare block reports
       for(int idx=0; idx < nrDatanodes; idx++) {
@@ -987,12 +990,12 @@ public class NNThroughputBenchmark {
     throws IOException {
       ExtendedBlock prevBlock = null;
       for(int jdx = 0; jdx < blocksPerFile; jdx++) {
-        LocatedBlock loc = nameNode.addBlock(fileName, clientName, prevBlock, null);
+        LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, prevBlock, null);
         prevBlock = loc.getBlock();
         for(DatanodeInfo dnInfo : loc.getLocations()) {
           int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
           datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
-          nameNode.blockReceived(
+          nameNodeProto.blockReceived(
               datanodes[dnIdx].dnRegistration, 
               loc.getBlock().getBlockPoolId(),
               new Block[] {loc.getBlock().getLocalBlock()},
@@ -1013,7 +1016,7 @@ public class NNThroughputBenchmark {
       assert daemonId < numThreads : "Wrong daemonId.";
       TinyDatanode dn = datanodes[daemonId];
       long start = System.currentTimeMillis();
-      nameNode.blockReport(dn.dnRegistration, nameNode.getNamesystem()
+      nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
           .getBlockPoolId(), dn.getBlockReportList());
       long end = System.currentTimeMillis();
       return end-start;
@@ -1146,7 +1149,7 @@ public class NNThroughputBenchmark {
         LOG.info("Datanode " + dn.getName() + " is decommissioned.");
       }
       excludeFile.close();
-      nameNode.refreshNodes();
+      nameNodeProto.refreshNodes();
     }
 
     /**
@@ -1160,8 +1163,8 @@ public class NNThroughputBenchmark {
       assert daemonId < numThreads : "Wrong daemonId.";
       long start = System.currentTimeMillis();
       // compute data-node work
-      int work = BlockManagerTestUtil.getComputedDatanodeWork(nameNode
-          .getNamesystem().getBlockManager());
+      int work = BlockManagerTestUtil.getComputedDatanodeWork(
+          nameNode.getNamesystem().getBlockManager());
       long end = System.currentTimeMillis();
       numPendingBlocks += work;
       if(work == 0)

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Mon Sep 12 18:59:42 2011
@@ -47,14 +47,6 @@ public class NameNodeAdapter {
         src, offset, length, false, true);
   }
 
-  /**
-   * Get the internal RPC server instance.
-   * @return rpc server
-   */
-  public static Server getRpcServer(NameNode namenode) {
-    return namenode.server;
-  }
-
   public static DelegationTokenSecretManager getDtSecretManager(
       final FSNamesystem ns) {
     return ns.getDelegationTokenSecretManager();

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java Mon Sep 12 18:59:42 2011
@@ -239,10 +239,10 @@ public class OfflineEditsViewerHelper {
         LOG.info("Innocuous exception", e);
       }
       locatedBlocks = DFSClientAdapter.callGetBlockLocations(
-          cluster.getNameNode(), filePath, 0L, bytes.length);
+          cluster.getNameNodeRpc(), filePath, 0L, bytes.length);
     } while (locatedBlocks.isUnderConstruction());
 
     // Force a roll so we get an OP_END_LOG_SEGMENT txn
-    return cluster.getNameNode().rollEditLog();
+    return cluster.getNameNodeRpc().rollEditLog();
   }
 }



Mime
View raw message