Added: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1169868&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Mon Sep 12 18:59:42 2011
@@ -0,0 +1,920 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_DEPTH;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_LENGTH;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import org.apache.hadoop.hdfs.HDFSPolicyProvider;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
+import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
+
+/**
+ * This class is responsible for handling all of the RPC calls to the NameNode.
+ * It is created, started, and stopped by {@link NameNode}.
+ */
+class NameNodeRpcServer implements NamenodeProtocols {
+
+ private static final Log LOG = NameNode.LOG;
+ private static final Log stateChangeLog = NameNode.stateChangeLog;
+
+ // Dependencies from other parts of NN.
+ private final FSNamesystem namesystem;
+ protected final NameNode nn;
+ private final NameNodeMetrics metrics;
+
+ private final boolean serviceAuthEnabled;
+
+ /** The RPC server that listens to requests from DataNodes */
+ private final RPC.Server serviceRpcServer;
+ private final InetSocketAddress serviceRPCAddress;
+
+ /** The RPC server that listens to requests from clients */
+ protected final RPC.Server server;
+ protected final InetSocketAddress rpcAddress;
+
+ public NameNodeRpcServer(Configuration conf, NameNode nn)
+ throws IOException {
+ this.nn = nn;
+ this.namesystem = nn.getNamesystem();
+ this.metrics = NameNode.getNameNodeMetrics();
+
+ int handlerCount =
+ conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
+ DFS_DATANODE_HANDLER_COUNT_DEFAULT);
+ InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
+
+ InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
+ if (dnSocketAddr != null) {
+ int serviceHandlerCount =
+ conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
+ DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
+ this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
+ dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,
+ false, conf, namesystem.getDelegationTokenSecretManager());
+ this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
+ nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
+ } else {
+ serviceRpcServer = null;
+ serviceRPCAddress = null;
+ }
+ this.server = RPC.getServer(NamenodeProtocols.class, this,
+ socAddr.getHostName(), socAddr.getPort(),
+ handlerCount, false, conf,
+ namesystem.getDelegationTokenSecretManager());
+
+ // set service-level authorization security policy
+ if (serviceAuthEnabled =
+ conf.getBoolean(
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+ this.server.refreshServiceAcl(conf, new HDFSPolicyProvider());
+ if (this.serviceRpcServer != null) {
+ this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
+ }
+ }
+
+ // The rpc-server port can be ephemeral... ensure we have the correct info
+ this.rpcAddress = this.server.getListenerAddress();
+ nn.setRpcServerAddress(conf, rpcAddress);
+ }
+
+ /**
+ * Actually start serving requests.
+ */
+ void start() {
+ server.start(); //start RPC server
+ if (serviceRpcServer != null) {
+ serviceRpcServer.start();
+ }
+ }
+
+ /**
+ * Wait until the RPC server has shut down.
+ */
+ void join() throws InterruptedException {
+ this.server.join();
+ }
+
+ void stop() {
+ if(server != null) server.stop();
+ if(serviceRpcServer != null) serviceRpcServer.stop();
+ }
+
+ InetSocketAddress getServiceRpcAddress() {
+ return serviceRPCAddress;
+ }
+
+ InetSocketAddress getRpcAddress() {
+ return rpcAddress;
+ }
+
+ @Override // VersionedProtocol
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException {
+ if (protocol.equals(ClientProtocol.class.getName())) {
+ return ClientProtocol.versionID;
+ } else if (protocol.equals(DatanodeProtocol.class.getName())){
+ return DatanodeProtocol.versionID;
+ } else if (protocol.equals(NamenodeProtocol.class.getName())){
+ return NamenodeProtocol.versionID;
+ } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
+ return RefreshAuthorizationPolicyProtocol.versionID;
+ } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){
+ return RefreshUserMappingsProtocol.versionID;
+ } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
+ return GetUserMappingsProtocol.versionID;
+ } else {
+ throw new IOException("Unknown protocol to name node: " + protocol);
+ }
+ }
+
+ /////////////////////////////////////////////////////
+ // NamenodeProtocol
+ /////////////////////////////////////////////////////
+ @Override // NamenodeProtocol
+ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+ throws IOException {
+ if(size <= 0) {
+ throw new IllegalArgumentException(
+ "Unexpected not positive size: "+size);
+ }
+
+ return namesystem.getBlockManager().getBlocks(datanode, size);
+ }
+
+ @Override // NamenodeProtocol
+ public ExportedBlockKeys getBlockKeys() throws IOException {
+ return namesystem.getBlockManager().getBlockKeys();
+ }
+
+ @Override // NamenodeProtocol
+ public void errorReport(NamenodeRegistration registration,
+ int errorCode,
+ String msg) throws IOException {
+ verifyRequest(registration);
+ LOG.info("Error report from " + registration + ": " + msg);
+ if(errorCode == FATAL)
+ namesystem.releaseBackupNode(registration);
+ }
+
+ @Override // NamenodeProtocol
+ public NamenodeRegistration register(NamenodeRegistration registration)
+ throws IOException {
+ verifyVersion(registration.getVersion());
+ NamenodeRegistration myRegistration = nn.setRegistration();
+ namesystem.registerBackupNode(registration, myRegistration);
+ return myRegistration;
+ }
+
+ @Override // NamenodeProtocol
+ public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+ throws IOException {
+ verifyRequest(registration);
+ if(!nn.isRole(NamenodeRole.NAMENODE))
+ throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
+ return namesystem.startCheckpoint(registration, nn.setRegistration());
+ }
+
+ @Override // NamenodeProtocol
+ public void endCheckpoint(NamenodeRegistration registration,
+ CheckpointSignature sig) throws IOException {
+ verifyRequest(registration);
+ if(!nn.isRole(NamenodeRole.NAMENODE))
+ throw new IOException("Only an ACTIVE node can invoke endCheckpoint.");
+ namesystem.endCheckpoint(registration, sig);
+ }
+
+ @Override // ClientProtocol
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+ throws IOException {
+ return namesystem.getDelegationToken(renewer);
+ }
+
+ @Override // ClientProtocol
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
+ throws InvalidToken, IOException {
+ return namesystem.renewDelegationToken(token);
+ }
+
+ @Override // ClientProtocol
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
+ throws IOException {
+ namesystem.cancelDelegationToken(token);
+ }
+
+ @Override // ClientProtocol
+ public LocatedBlocks getBlockLocations(String src,
+ long offset,
+ long length)
+ throws IOException {
+ metrics.incrGetBlockLocations();
+ return namesystem.getBlockLocations(getClientMachine(),
+ src, offset, length);
+ }
+
+ @Override // ClientProtocol
+ public FsServerDefaults getServerDefaults() throws IOException {
+ return namesystem.getServerDefaults();
+ }
+
+ @Override // ClientProtocol
+ public void create(String src,
+ FsPermission masked,
+ String clientName,
+ EnumSetWritable<CreateFlag> flag,
+ boolean createParent,
+ short replication,
+ long blockSize) throws IOException {
+ String clientMachine = getClientMachine();
+ if (stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*DIR* NameNode.create: file "
+ +src+" for "+clientName+" at "+clientMachine);
+ }
+ if (!checkPathLength(src)) {
+ throw new IOException("create: Pathname too long. Limit "
+ + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+ }
+ namesystem.startFile(src,
+ new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
+ null, masked),
+ clientName, clientMachine, flag.get(), createParent, replication, blockSize);
+ metrics.incrFilesCreated();
+ metrics.incrCreateFileOps();
+ }
+
+ @Override // ClientProtocol
+ public LocatedBlock append(String src, String clientName)
+ throws IOException {
+ String clientMachine = getClientMachine();
+ if (stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*DIR* NameNode.append: file "
+ +src+" for "+clientName+" at "+clientMachine);
+ }
+ LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
+ metrics.incrFilesAppended();
+ return info;
+ }
+
+ @Override // ClientProtocol
+ public boolean recoverLease(String src, String clientName) throws IOException {
+ String clientMachine = getClientMachine();
+ return namesystem.recoverLease(src, clientName, clientMachine);
+ }
+
+ @Override // ClientProtocol
+ public boolean setReplication(String src, short replication)
+ throws IOException {
+ return namesystem.setReplication(src, replication);
+ }
+
+ @Override // ClientProtocol
+ public void setPermission(String src, FsPermission permissions)
+ throws IOException {
+ namesystem.setPermission(src, permissions);
+ }
+
+ @Override // ClientProtocol
+ public void setOwner(String src, String username, String groupname)
+ throws IOException {
+ namesystem.setOwner(src, username, groupname);
+ }
+
+ @Override // ClientProtocol
+ public LocatedBlock addBlock(String src,
+ String clientName,
+ ExtendedBlock previous,
+ DatanodeInfo[] excludedNodes)
+ throws IOException {
+ if(stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+ +src+" for "+clientName);
+ }
+ HashMap<Node, Node> excludedNodesSet = null;
+ if (excludedNodes != null) {
+ excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
+ for (Node node:excludedNodes) {
+ excludedNodesSet.put(node, node);
+ }
+ }
+ LocatedBlock locatedBlock =
+ namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
+ if (locatedBlock != null)
+ metrics.incrAddBlockOps();
+ return locatedBlock;
+ }
+
+ @Override // ClientProtocol
+ public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+ final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+ final int numAdditionalNodes, final String clientName
+ ) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getAdditionalDatanode: src=" + src
+ + ", blk=" + blk
+ + ", existings=" + Arrays.asList(existings)
+ + ", excludes=" + Arrays.asList(excludes)
+ + ", numAdditionalNodes=" + numAdditionalNodes
+ + ", clientName=" + clientName);
+ }
+
+ metrics.incrGetAdditionalDatanodeOps();
+
+ HashMap<Node, Node> excludeSet = null;
+ if (excludes != null) {
+ excludeSet = new HashMap<Node, Node>(excludes.length);
+ for (Node node : excludes) {
+ excludeSet.put(node, node);
+ }
+ }
+ return namesystem.getAdditionalDatanode(src, blk,
+ existings, excludeSet, numAdditionalNodes, clientName);
+ }
+
+ /**
+ * The client needs to give up on the block.
+ */
+ public void abandonBlock(ExtendedBlock b, String src, String holder)
+ throws IOException {
+ if(stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
+ +b+" of file "+src);
+ }
+ if (!namesystem.abandonBlock(b, src, holder)) {
+ throw new IOException("Cannot abandon block during write to " + src);
+ }
+ }
+
+ @Override // ClientProtocol
+ public boolean complete(String src, String clientName, ExtendedBlock last)
+ throws IOException {
+ if(stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*DIR* NameNode.complete: "
+ + src + " for " + clientName);
+ }
+ return namesystem.completeFile(src, clientName, last);
+ }
+
+ /**
+ * The client has detected an error on the specified located blocks
+ * and is reporting them to the server. For now, the namenode will
+ * mark the block as corrupt. In the future we might
+ * check the blocks are actually corrupt.
+ */
+ @Override
+ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+ stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
+ for (int i = 0; i < blocks.length; i++) {
+ ExtendedBlock blk = blocks[i].getBlock();
+ DatanodeInfo[] nodes = blocks[i].getLocations();
+ for (int j = 0; j < nodes.length; j++) {
+ DatanodeInfo dn = nodes[j];
+ namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn);
+ }
+ }
+ }
+
+ @Override // ClientProtocol
+ public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
+ throws IOException {
+ return namesystem.updateBlockForPipeline(block, clientName);
+ }
+
+
+ @Override // ClientProtocol
+ public void updatePipeline(String clientName, ExtendedBlock oldBlock,
+ ExtendedBlock newBlock, DatanodeID[] newNodes)
+ throws IOException {
+ namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
+ }
+
+ @Override // DatanodeProtocol
+ public void commitBlockSynchronization(ExtendedBlock block,
+ long newgenerationstamp, long newlength,
+ boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
+ throws IOException {
+ namesystem.commitBlockSynchronization(block,
+ newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
+ }
+
+ @Override // ClientProtocol
+ public long getPreferredBlockSize(String filename)
+ throws IOException {
+ return namesystem.getPreferredBlockSize(filename);
+ }
+
+ @Deprecated
+ @Override // ClientProtocol
+ public boolean rename(String src, String dst) throws IOException {
+ if(stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
+ }
+ if (!checkPathLength(dst)) {
+ throw new IOException("rename: Pathname too long. Limit "
+ + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+ }
+ boolean ret = namesystem.renameTo(src, dst);
+ if (ret) {
+ metrics.incrFilesRenamed();
+ }
+ return ret;
+ }
+
+ @Override // ClientProtocol
+ public void concat(String trg, String[] src) throws IOException {
+ namesystem.concat(trg, src);
+ }
+
+ @Override // ClientProtocol
+ public void rename(String src, String dst, Options.Rename... options)
+ throws IOException {
+ if(stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
+ }
+ if (!checkPathLength(dst)) {
+ throw new IOException("rename: Pathname too long. Limit "
+ + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+ }
+ namesystem.renameTo(src, dst, options);
+ metrics.incrFilesRenamed();
+ }
+
+ @Deprecated
+ @Override // ClientProtocol
+ public boolean delete(String src) throws IOException {
+ return delete(src, true);
+ }
+
+ @Override // ClientProtocol
+ public boolean delete(String src, boolean recursive) throws IOException {
+ if (stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+ + ", recursive=" + recursive);
+ }
+ boolean ret = namesystem.delete(src, recursive);
+ if (ret)
+ metrics.incrDeleteFileOps();
+ return ret;
+ }
+
+ /**
+ * Check path length does not exceed maximum. Returns true if
+ * length and depth are okay. Returns false if length is too long
+ * or depth is too great.
+ */
+ private boolean checkPathLength(String src) {
+ Path srcPath = new Path(src);
+ return (src.length() <= MAX_PATH_LENGTH &&
+ srcPath.depth() <= MAX_PATH_DEPTH);
+ }
+
+ @Override // ClientProtocol
+ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
+ throws IOException {
+ if(stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
+ }
+ if (!checkPathLength(src)) {
+ throw new IOException("mkdirs: Pathname too long. Limit "
+ + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+ }
+ return namesystem.mkdirs(src,
+ new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
+ null, masked), createParent);
+ }
+
+ @Override // ClientProtocol
+ public void renewLease(String clientName) throws IOException {
+ namesystem.renewLease(clientName);
+ }
+
+ @Override // ClientProtocol
+ public DirectoryListing getListing(String src, byte[] startAfter,
+ boolean needLocation)
+ throws IOException {
+ DirectoryListing files = namesystem.getListing(
+ src, startAfter, needLocation);
+ if (files != null) {
+ metrics.incrGetListingOps();
+ metrics.incrFilesInGetListingOps(files.getPartialListing().length);
+ }
+ return files;
+ }
+
+ @Override // ClientProtocol
+ public HdfsFileStatus getFileInfo(String src) throws IOException {
+ metrics.incrFileInfoOps();
+ return namesystem.getFileInfo(src, true);
+ }
+
+ @Override // ClientProtocol
+ public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
+ metrics.incrFileInfoOps();
+ return namesystem.getFileInfo(src, false);
+ }
+
+ @Override
+ public long[] getStats() {
+ return namesystem.getStats();
+ }
+
+ @Override // ClientProtocol
+ public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
+ throws IOException {
+ DatanodeInfo results[] = namesystem.datanodeReport(type);
+ if (results == null ) {
+ throw new IOException("Cannot find datanode report");
+ }
+ return results;
+ }
+
+ @Override // ClientProtocol
+ public boolean setSafeMode(SafeModeAction action) throws IOException {
+ return namesystem.setSafeMode(action);
+ }
+
+ @Override // ClientProtocol
+ public boolean restoreFailedStorage(String arg)
+ throws AccessControlException {
+ return namesystem.restoreFailedStorage(arg);
+ }
+
+ @Override // ClientProtocol
+ public void saveNamespace() throws IOException {
+ namesystem.saveNamespace();
+ }
+
+ @Override // ClientProtocol
+ public void refreshNodes() throws IOException {
+ namesystem.getBlockManager().getDatanodeManager().refreshNodes(
+ new HdfsConfiguration());
+ }
+
+ @Override // NamenodeProtocol
+ public long getTransactionID() {
+ return namesystem.getEditLog().getSyncTxId();
+ }
+
+ @Override // NamenodeProtocol
+ public CheckpointSignature rollEditLog() throws IOException {
+ return namesystem.rollEditLog();
+ }
+
+ @Override
+ public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+ throws IOException {
+ return namesystem.getEditLog().getEditLogManifest(sinceTxId);
+ }
+
+ @Override // ClientProtocol
+ public void finalizeUpgrade() throws IOException {
+ namesystem.finalizeUpgrade();
+ }
+
+ @Override // ClientProtocol
+ public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
+ throws IOException {
+ return namesystem.distributedUpgradeProgress(action);
+ }
+
+ @Override // ClientProtocol
+ public void metaSave(String filename) throws IOException {
+ namesystem.metaSave(filename);
+ }
+
+ @Override // ClientProtocol
+ public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
+ throws IOException {
+ Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
+ namesystem.listCorruptFileBlocks(path, cookie);
+
+ String[] files = new String[fbs.size()];
+ String lastCookie = "";
+ int i = 0;
+ for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
+ files[i++] = fb.path;
+ lastCookie = fb.block.getBlockName();
+ }
+ return new CorruptFileBlocks(files, lastCookie);
+ }
+
+ /**
+ * Tell all datanodes to use a new, non-persistent bandwidth value for
+ * dfs.datanode.balance.bandwidthPerSec.
+ * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+ * @throws IOException
+ */
+ @Override // ClientProtocol
+ public void setBalancerBandwidth(long bandwidth) throws IOException {
+ namesystem.getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
+ }
+
+ @Override // ClientProtocol
+ public ContentSummary getContentSummary(String path) throws IOException {
+ return namesystem.getContentSummary(path);
+ }
+
+ @Override // ClientProtocol
+ public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
+ throws IOException {
+ namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
+ }
+
+ @Override // ClientProtocol
+ public void fsync(String src, String clientName) throws IOException {
+ namesystem.fsync(src, clientName);
+ }
+
+ @Override // ClientProtocol
+ public void setTimes(String src, long mtime, long atime)
+ throws IOException {
+ namesystem.setTimes(src, mtime, atime);
+ }
+
+ @Override // ClientProtocol
+ public void createSymlink(String target, String link, FsPermission dirPerms,
+ boolean createParent) throws IOException {
+ metrics.incrCreateSymlinkOps();
+ /* We enforce the MAX_PATH_LENGTH limit even though a symlink target
+ * URI may refer to a non-HDFS file system.
+ */
+ if (!checkPathLength(link)) {
+ throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH +
+ " character limit");
+
+ }
+ if ("".equals(target)) {
+ throw new IOException("Invalid symlink target");
+ }
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ namesystem.createSymlink(target, link,
+ new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
+ }
+
+ @Override // ClientProtocol
+ public String getLinkTarget(String path) throws IOException {
+ metrics.incrGetLinkTargetOps();
+ /* Resolves the first symlink in the given path, returning a
+ * new path consisting of the target of the symlink and any
+ * remaining path components from the original path.
+ */
+ try {
+ HdfsFileStatus stat = namesystem.getFileInfo(path, false);
+ if (stat != null) {
+ // NB: getSymlink throws IOException if !stat.isSymlink()
+ return stat.getSymlink();
+ }
+ } catch (UnresolvedPathException e) {
+ return e.getResolvedPath().toString();
+ } catch (UnresolvedLinkException e) {
+ // The NameNode should only throw an UnresolvedPathException
+ throw new AssertionError("UnresolvedLinkException thrown");
+ }
+ return null;
+ }
+
+
+ @Override // DatanodeProtocol
+ public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
+ throws IOException {
+ verifyVersion(nodeReg.getVersion());
+ namesystem.registerDatanode(nodeReg);
+
+ return nodeReg;
+ }
+
+ @Override // DatanodeProtocol
+ public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
+ long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+ int xmitsInProgress, int xceiverCount, int failedVolumes)
+ throws IOException {
+ verifyRequest(nodeReg);
+ return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
+ blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes);
+ }
+
+ @Override // DatanodeProtocol
+ public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
+ String poolId, long[] blocks) throws IOException {
+ verifyRequest(nodeReg);
+ BlockListAsLongs blist = new BlockListAsLongs(blocks);
+ if(stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+ + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
+ + " blocks");
+ }
+
+ namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
+ if (nn.getFSImage().isUpgradeFinalized())
+ return new DatanodeCommand.Finalize(poolId);
+ return null;
+ }
+
+ @Override // DatanodeProtocol
+ public void blockReceived(DatanodeRegistration nodeReg, String poolId,
+ Block blocks[], String delHints[]) throws IOException {
+ verifyRequest(nodeReg);
+ if(stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*BLOCK* NameNode.blockReceived: "
+ +"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
+ }
+ for (int i = 0; i < blocks.length; i++) {
+ namesystem.getBlockManager().blockReceived(
+ nodeReg, poolId, blocks[i], delHints[i]);
+ }
+ }
+
+ @Override // DatanodeProtocol
+ public void errorReport(DatanodeRegistration nodeReg,
+ int errorCode, String msg) throws IOException {
+ String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName());
+
+ if (errorCode == DatanodeProtocol.NOTIFY) {
+ LOG.info("Error report from " + dnName + ": " + msg);
+ return;
+ }
+ verifyRequest(nodeReg);
+
+ if (errorCode == DatanodeProtocol.DISK_ERROR) {
+ LOG.warn("Disk error on " + dnName + ": " + msg);
+ } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
+ LOG.warn("Fatal disk error on " + dnName + ": " + msg);
+ namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg);
+ } else {
+ LOG.info("Error report from " + dnName + ": " + msg);
+ }
+ }
+
+ @Override // DatanodeProtocol, NamenodeProtocol
+ public NamespaceInfo versionRequest() throws IOException {
+ return namesystem.getNamespaceInfo();
+ }
+
+ @Override // DatanodeProtocol
+ public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
+ return namesystem.processDistributedUpgradeCommand(comm);
+ }
+
+ /**
+ * Verify request.
+ *
+ * Verifies correctness of the datanode version, registration ID, and
+ * if the datanode does not need to be shutdown.
+ *
+ * @param nodeReg data node registration
+ * @throws IOException
+ */
+ void verifyRequest(NodeRegistration nodeReg) throws IOException {
+ verifyVersion(nodeReg.getVersion());
+ if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID())) {
+ LOG.warn("Invalid registrationID - expected: "
+ + namesystem.getRegistrationID() + " received: "
+ + nodeReg.getRegistrationID());
+ throw new UnregisteredNodeException(nodeReg);
+ }
+ }
+
+
+ @Override // RefreshAuthorizationPolicyProtocol
+ public void refreshServiceAcl() throws IOException {
+ if (!serviceAuthEnabled) {
+ throw new AuthorizationException("Service Level Authorization not enabled!");
+ }
+
+ this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
+ if (this.serviceRpcServer != null) {
+ this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
+ }
+ }
+
+ @Override // RefreshAuthorizationPolicyProtocol
+ public void refreshUserToGroupsMappings() throws IOException {
+ LOG.info("Refreshing all user-to-groups mappings. Requested by user: " +
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ Groups.getUserToGroupsMappingService().refresh();
+ }
+
+ @Override // RefreshAuthorizationPolicyProtocol
+ public void refreshSuperUserGroupsConfiguration() {
+ LOG.info("Refreshing SuperUser proxy group mapping list ");
+
+ ProxyUsers.refreshSuperUserGroupsConfiguration();
+ }
+
+ @Override // GetUserMappingsProtocol
+ public String[] getGroupsForUser(String user) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting groups for user " + user);
+ }
+ return UserGroupInformation.createRemoteUser(user).getGroupNames();
+ }
+
+
+ /**
+ * Verify version.
+ *
+ * @param version
+ * @throws IOException
+ */
+ void verifyVersion(int version) throws IOException {
+ if (version != HdfsConstants.LAYOUT_VERSION)
+ throw new IncorrectVersionException(version, "data node");
+ }
+
+ private static String getClientMachine() {
+ String clientMachine = Server.getRemoteAddress();
+ if (clientMachine == null) {
+ clientMachine = "";
+ }
+ return clientMachine;
+ }
+}
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Mon Sep 12 18:59:42 2011
@@ -173,7 +173,7 @@ public class NamenodeFsck {
out.println(msg);
namenode.getNamesystem().logFsckEvent(path, remoteAddress);
- final HdfsFileStatus file = namenode.getFileInfo(path);
+ final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(path);
if (file != null) {
if (showCorruptFileBlocks) {
@@ -250,7 +250,8 @@ public class NamenodeFsck {
res.totalDirs++;
do {
assert lastReturnedName != null;
- thisListing = namenode.getListing(path, lastReturnedName, false);
+ thisListing = namenode.getRpcServer().getListing(
+ path, lastReturnedName, false);
if (thisListing == null) {
return;
}
@@ -385,7 +386,7 @@ public class NamenodeFsck {
break;
case FIXING_DELETE:
if (!isOpen)
- namenode.delete(path, true);
+ namenode.getRpcServer().delete(path, true);
}
}
if (showFiles) {
@@ -414,7 +415,8 @@ public class NamenodeFsck {
String target = lostFound + fullName;
String errmsg = "Failed to move " + fullName + " to /lost+found";
try {
- if (!namenode.mkdirs(target, file.getPermission(), true)) {
+ if (!namenode.getRpcServer().mkdirs(
+ target, file.getPermission(), true)) {
LOG.warn(errmsg);
return;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Mon Sep 12 18:59:42 2011
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation;
@@ -354,7 +355,7 @@ class NamenodeJspHelper {
}
}
- static String getDelegationToken(final NameNode nn,
+ static String getDelegationToken(final NamenodeProtocols nn,
HttpServletRequest request, Configuration conf,
final UserGroupInformation ugi) throws IOException, InterruptedException {
Token<DelegationTokenIdentifier> token = ugi
@@ -381,7 +382,8 @@ class NamenodeJspHelper {
.getAttribute(JspHelper.CURRENT_CONF);
final DatanodeID datanode = getRandomDatanode(nn);
UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
- String tokenString = getDelegationToken(nn, request, conf, ugi);
+ String tokenString = getDelegationToken(
+ nn.getRpcServer(), request, conf, ugi);
// if the user is defined, get a delegation token and stringify it
final String redirectLocation;
final String nodeToRedirect;
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java Mon Sep 12 18:59:42 2011
@@ -70,7 +70,7 @@ public class RenewDelegationTokenServlet
try {
long result = ugi.doAs(new PrivilegedExceptionAction<Long>() {
public Long run() throws Exception {
- return nn.renewDelegationToken(token);
+ return nn.getRpcServer().renewDelegationToken(token);
}
});
PrintStream os = new PrintStream(resp.getOutputStream());
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Mon Sep 12 18:59:42 2011
@@ -1026,6 +1026,14 @@ public class MiniDFSCluster {
}
/**
+ * Get an instance of the NameNode's RPC handler.
+ */
+ public NamenodeProtocols getNameNodeRpc() {
+ checkSingleNameNode();
+ return getNameNode(0).getRpcServer();
+ }
+
+ /**
* Gets the NameNode for the index. May be null.
*/
public NameNode getNameNode(int nnIndex) {
@@ -1361,7 +1369,15 @@ public class MiniDFSCluster {
if (nameNode == null) {
return false;
}
- long[] sizes = nameNode.getStats();
+ long[] sizes;
+ try {
+ sizes = nameNode.getRpcServer().getStats();
+ } catch (IOException ioe) {
+ // This method above should never throw.
+ // It only throws IOE since it is exposed via RPC
+ throw new AssertionError("Unexpected IOE thrown: "
+ + StringUtils.stringifyException(ioe));
+ }
boolean isUp = false;
synchronized (this) {
isUp = ((!nameNode.isInSafeMode() || !waitSafeMode) && sizes[0] != 0);
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java Mon Sep 12 18:59:42 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.IOUtils;
@@ -45,7 +46,7 @@ public class TestClientProtocolForPipeli
try {
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
- NameNode namenode = cluster.getNameNode();
+ NamenodeProtocols namenode = cluster.getNameNodeRpc();
/* Test writing to finalized replicas */
Path file = new Path("dataprotocol.dat");
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java Mon Sep 12 18:59:42 2011
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Client;
@@ -190,7 +191,7 @@ public class TestDFSClientRetries extend
final int maxRetries = 1; // Allow one retry (total of two calls)
conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries);
- NameNode mockNN = mock(NameNode.class);
+ NamenodeProtocols mockNN = mock(NamenodeProtocols.class);
Answer<Object> answer = new ThrowsException(new IOException()) {
int retryCount = 0;
@@ -240,8 +241,8 @@ public class TestDFSClientRetries extend
try {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
- NameNode preSpyNN = cluster.getNameNode();
- NameNode spyNN = spy(preSpyNN);
+ NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
+ NamenodeProtocols spyNN = spy(preSpyNN);
DFSClient client = new DFSClient(null, spyNN, conf, null);
int maxBlockAcquires = client.getMaxBlockAcquireFailures();
assertTrue(maxBlockAcquires > 0);
@@ -305,11 +306,11 @@ public class TestDFSClientRetries extend
*/
private static class FailNTimesAnswer implements Answer<LocatedBlocks> {
private int failuresLeft;
- private NameNode realNN;
+ private NamenodeProtocols realNN;
- public FailNTimesAnswer(NameNode realNN, int timesToFail) {
+ public FailNTimesAnswer(NamenodeProtocols preSpyNN, int timesToFail) {
failuresLeft = timesToFail;
- this.realNN = realNN;
+ this.realNN = preSpyNN;
}
public LocatedBlocks answer(InvocationOnMock invocation) throws IOException {
@@ -603,7 +604,8 @@ public class TestDFSClientRetries extend
//stop the first datanode
final List<LocatedBlock> locatedblocks = DFSClient.callGetBlockLocations(
- cluster.getNameNode(), f, 0, Long.MAX_VALUE).getLocatedBlocks();
+ cluster.getNameNodeRpc(), f, 0, Long.MAX_VALUE)
+ .getLocatedBlocks();
final DatanodeInfo first = locatedblocks.get(0).getLocations()[0];
cluster.stopDataNode(first.getName());
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java Mon Sep 12 18:59:42 2011
@@ -293,10 +293,11 @@ public class TestDecommission {
}
private void verifyStats(NameNode namenode, FSNamesystem fsn,
- DatanodeInfo node, boolean decommissioning) throws InterruptedException {
+ DatanodeInfo node, boolean decommissioning)
+ throws InterruptedException, IOException {
// Do the stats check over 10 iterations
for (int i = 0; i < 10; i++) {
- long[] newStats = namenode.getStats();
+ long[] newStats = namenode.getRpcServer().getStats();
// For decommissioning nodes, ensure capacity of the DN is no longer
// counted. Only used space of the DN is counted in cluster capacity
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java Mon Sep 12 18:59:42 2011
@@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -44,6 +43,7 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Before;
@@ -151,8 +151,8 @@ public class TestFileAppend4 {
try {
cluster.waitActive();
- NameNode preSpyNN = cluster.getNameNode();
- NameNode spyNN = spy(preSpyNN);
+ NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
+ NamenodeProtocols spyNN = spy(preSpyNN);
// Delay completeFile
GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
@@ -222,8 +222,8 @@ public class TestFileAppend4 {
try {
cluster.waitActive();
- NameNode preSpyNN = cluster.getNameNode();
- NameNode spyNN = spy(preSpyNN);
+ NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
+ NamenodeProtocols spyNN = spy(preSpyNN);
// Delay completeFile
GenericTestUtils.DelayAnswer delayer =
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java Mon Sep 12 18:59:42 2011
@@ -420,7 +420,7 @@ public class TestFileCreation extends ju
final Path f = new Path("/foo.txt");
createFile(dfs, f, 3);
try {
- cluster.getNameNode().addBlock(f.toString(),
+ cluster.getNameNodeRpc().addBlock(f.toString(),
client.clientName, null, null);
fail();
} catch(IOException ioe) {
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java Mon Sep 12 18:59:42 2011
@@ -106,7 +106,7 @@ public class TestLeaseRecovery extends j
DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
- cluster.getNameNode().append(filestr, dfs.dfs.clientName);
+ cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName);
// expire lease to trigger block recovery.
waitLeaseRecovery(cluster);
@@ -129,14 +129,14 @@ public class TestLeaseRecovery extends j
filestr = "/foo.safemode";
filepath = new Path(filestr);
dfs.create(filepath, (short)1);
- cluster.getNameNode().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
+ cluster.getNameNodeRpc().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
assertTrue(dfs.dfs.exists(filestr));
DFSTestUtil.waitReplication(dfs, filepath, (short)1);
waitLeaseRecovery(cluster);
// verify that we still cannot recover the lease
LeaseManager lm = NameNodeAdapter.getLeaseManager(cluster.getNamesystem());
assertTrue("Found " + lm.countLease() + " lease, expected 1", lm.countLease() == 1);
- cluster.getNameNode().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+ cluster.getNameNodeRpc().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
}
finally {
if (cluster != null) {cluster.shutdown();}
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java Mon Sep 12 18:59:42 2011
@@ -100,7 +100,7 @@ public class TestPipelines {
ofs.writeBytes("Some more stuff to write");
((DFSOutputStream) ofs.getWrappedStream()).hflush();
- List<LocatedBlock> lb = cluster.getNameNode().getBlockLocations(
+ List<LocatedBlock> lb = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks();
String bpid = cluster.getNamesystem().getBlockPoolId();
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java Mon Sep 12 18:59:42 2011
@@ -51,7 +51,7 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
/**
* This class defines a number of static helper methods used by the
@@ -121,7 +121,7 @@ public class UpgradeUtilities {
.manageNameDfsDirs(false)
.build();
- NameNode namenode = cluster.getNameNode();
+ NamenodeProtocols namenode = cluster.getNameNodeRpc();
namenodeStorageNamespaceID = namenode.versionRequest().getNamespaceID();
namenodeStorageFsscTime = namenode.versionRequest().getCTime();
namenodeStorageClusterID = namenode.versionRequest().getClusterID();
@@ -517,7 +517,7 @@ public class UpgradeUtilities {
*/
public static int getCurrentNamespaceID(MiniDFSCluster cluster) throws IOException {
if (cluster != null) {
- return cluster.getNameNode().versionRequest().getNamespaceID();
+ return cluster.getNameNodeRpc().versionRequest().getNamespaceID();
}
return namenodeStorageNamespaceID;
}
@@ -528,7 +528,7 @@ public class UpgradeUtilities {
*/
public static String getCurrentClusterID(MiniDFSCluster cluster) throws IOException {
if (cluster != null) {
- return cluster.getNameNode().versionRequest().getClusterID();
+ return cluster.getNameNodeRpc().versionRequest().getClusterID();
}
return namenodeStorageClusterID;
}
@@ -539,7 +539,7 @@ public class UpgradeUtilities {
*/
public static String getCurrentBlockPoolID(MiniDFSCluster cluster) throws IOException {
if (cluster != null) {
- return cluster.getNameNode().versionRequest().getBlockPoolID();
+ return cluster.getNameNodeRpc().versionRequest().getBlockPoolID();
}
return namenodeStorageBlockPoolID;
}
@@ -554,7 +554,7 @@ public class UpgradeUtilities {
*/
public static long getCurrentFsscTime(MiniDFSCluster cluster) throws IOException {
if (cluster != null) {
- return cluster.getNameNode().versionRequest().getCTime();
+ return cluster.getNameNodeRpc().versionRequest().getCTime();
}
return namenodeStorageFsscTime;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java Mon Sep 12 18:59:42 2011
@@ -375,11 +375,11 @@ public class TestBlockToken {
Path filePath = new Path(fileName);
FSDataOutputStream out = fs.create(filePath, (short) 1);
out.write(new byte[1000]);
- LocatedBlocks locatedBlocks = cluster.getNameNode().getBlockLocations(
+ LocatedBlocks locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(
fileName, 0, 1000);
while (locatedBlocks.getLastLocatedBlock() == null) {
Thread.sleep(100);
- locatedBlocks = cluster.getNameNode().getBlockLocations(fileName, 0,
+ locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(fileName, 0,
1000);
}
Token<BlockTokenIdentifier> token = locatedBlocks.getLastLocatedBlock()
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java Mon Sep 12 18:59:42 2011
@@ -89,7 +89,7 @@ public class TestBalancerWithMultipleNam
this.cluster = cluster;
clients = new ClientProtocol[nNameNodes];
for(int i = 0; i < nNameNodes; i++) {
- clients[i] = cluster.getNameNode(i);
+ clients[i] = cluster.getNameNode(i).getRpcServer();
}
replication = (short)Math.max(1, nDataNodes - 1);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Mon Sep 12 18:59:42 2011
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
@@ -314,6 +315,7 @@ public class TestBlockTokenWithDFS {
assertEquals(numDataNodes, cluster.getDataNodes().size());
final NameNode nn = cluster.getNameNode();
+ final NamenodeProtocols nnProto = nn.getRpcServer();
final BlockManager bm = nn.getNamesystem().getBlockManager();
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
@@ -344,7 +346,7 @@ public class TestBlockTokenWithDFS {
new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), conf);
- List<LocatedBlock> locatedBlocks = nn.getBlockLocations(
+ List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
LocatedBlock lblock = locatedBlocks.get(0); // first block
Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Mon Sep 12 18:59:42 2011
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.test.GenericTestUtils;
@@ -139,7 +138,7 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNode().blockReport(dnR, poolId,
+ cluster.getNameNodeRpc().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
List<LocatedBlock> blocksAfterReport =
@@ -181,9 +180,10 @@ public class TestBlockReport {
List<ExtendedBlock> blocks2Remove = new ArrayList<ExtendedBlock>();
List<Integer> removedIndex = new ArrayList<Integer>();
- List<LocatedBlock> lBlocks = cluster.getNameNode().getBlockLocations(
- filePath.toString(), FILE_START,
- FILE_SIZE).getLocatedBlocks();
+ List<LocatedBlock> lBlocks =
+ cluster.getNameNodeRpc().getBlockLocations(
+ filePath.toString(), FILE_START,
+ FILE_SIZE).getLocatedBlocks();
while (removedIndex.size() != 2) {
int newRemoveIndex = rand.nextInt(lBlocks.size());
@@ -218,7 +218,7 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNode().blockReport(dnR, poolId,
+ cluster.getNameNodeRpc().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
@@ -258,7 +258,8 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- DatanodeCommand dnCmd = cluster.getNameNode().blockReport(dnR, poolId,
+ DatanodeCommand dnCmd =
+ cluster.getNameNodeRpc().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
if(LOG.isDebugEnabled()) {
LOG.debug("Got the command: " + dnCmd);
@@ -310,7 +311,7 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNode().blockReport(dnR, poolId,
+ cluster.getNameNodeRpc().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
printStats();
assertEquals("Wrong number of PendingReplication Blocks",
@@ -359,7 +360,7 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNode().blockReport(dnR, poolId,
+ cluster.getNameNodeRpc().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
printStats();
assertEquals("Wrong number of Corrupted blocks",
@@ -381,7 +382,7 @@ public class TestBlockReport {
LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
}
- cluster.getNameNode().blockReport(dnR, poolId,
+ cluster.getNameNodeRpc().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
printStats();
@@ -431,7 +432,7 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNode().blockReport(dnR, poolId,
+ cluster.getNameNodeRpc().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
printStats();
assertEquals("Wrong number of PendingReplication blocks",
@@ -477,7 +478,7 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNode().blockReport(dnR, poolId,
+ cluster.getNameNodeRpc().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
printStats();
assertEquals("Wrong number of PendingReplication blocks",
@@ -590,7 +591,7 @@ public class TestBlockReport {
DFSTestUtil.createFile(fs, filePath, fileSize,
REPL_FACTOR, rand.nextLong());
- return locatedToBlocks(cluster.getNameNode()
+ return locatedToBlocks(cluster.getNameNodeRpc()
.getBlockLocations(filePath.toString(), FILE_START,
fileSize).getLocatedBlocks(), null);
}
@@ -707,7 +708,8 @@ public class TestBlockReport {
private Block findBlock(Path path, long size) throws IOException {
Block ret;
List<LocatedBlock> lbs =
- cluster.getNameNode().getBlockLocations(path.toString(),
+ cluster.getNameNodeRpc()
+ .getBlockLocations(path.toString(),
FILE_START, size).getLocatedBlocks();
LocatedBlock lb = lbs.get(lbs.size() - 1);
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Mon Sep 12 18:59:42 2011
@@ -40,8 +40,8 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils;
@@ -144,7 +144,7 @@ public class TestDataNodeVolumeFailure {
String bpid = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs();
- cluster.getNameNode().blockReport(dnR, bpid, bReport);
+ cluster.getNameNodeRpc().blockReport(dnR, bpid, bReport);
// verify number of blocks and files...
verify(filename, filesize);
@@ -216,7 +216,7 @@ public class TestDataNodeVolumeFailure {
* @throws IOException
*/
private void triggerFailure(String path, long size) throws IOException {
- NameNode nn = cluster.getNameNode();
+ NamenodeProtocols nn = cluster.getNameNodeRpc();
List<LocatedBlock> locatedBlocks =
nn.getBlockLocations(path, 0, size).getLocatedBlocks();
@@ -291,7 +291,7 @@ public class TestDataNodeVolumeFailure {
throws IOException {
int total = 0;
- NameNode nn = cluster.getNameNode();
+ NamenodeProtocols nn = cluster.getNameNodeRpc();
List<LocatedBlock> locatedBlocks =
nn.getBlockLocations(path, 0, size).getLocatedBlocks();
//System.out.println("Number of blocks: " + locatedBlocks.size());
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java Mon Sep 12 18:59:42 2011
@@ -109,7 +109,7 @@ public class TestTransferRbw {
final DatanodeInfo oldnodeinfo;
{
- final DatanodeInfo[] datatnodeinfos = cluster.getNameNode(
+ final DatanodeInfo[] datatnodeinfos = cluster.getNameNodeRpc(
).getDatanodeReport(DatanodeReportType.LIVE);
Assert.assertEquals(2, datatnodeinfos.length);
int i = 0;
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Mon Sep 12 18:59:42 2011
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -100,6 +101,7 @@ public class NNThroughputBenchmark {
static Configuration config;
static NameNode nameNode;
+ static NamenodeProtocols nameNodeProto;
NNThroughputBenchmark(Configuration conf) throws IOException, LoginException {
config = conf;
@@ -119,6 +121,7 @@ public class NNThroughputBenchmark {
// Start the NameNode
String[] argv = new String[] {};
nameNode = NameNode.createNameNode(argv, config);
+ nameNodeProto = nameNode.getRpcServer();
}
void close() throws IOException {
@@ -264,9 +267,9 @@ public class NNThroughputBenchmark {
}
void cleanUp() throws IOException {
- nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+ nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
if(!keepResults)
- nameNode.delete(getBaseDir(), true);
+ nameNodeProto.delete(getBaseDir(), true);
}
int getNumOpsExecuted() {
@@ -397,7 +400,7 @@ public class NNThroughputBenchmark {
void benchmarkOne() throws IOException {
for(int idx = 0; idx < opsPerThread; idx++) {
if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0)
- nameNode.refreshUserToGroupsMappings();
+ nameNodeProto.refreshUserToGroupsMappings();
long stat = statsOp.executeOp(daemonId, idx, arg1);
localNumOpsExecuted++;
localCumulativeTime += stat;
@@ -458,9 +461,9 @@ public class NNThroughputBenchmark {
*/
long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
- nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+ nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
long start = System.currentTimeMillis();
- nameNode.delete(BASE_DIR_NAME, true);
+ nameNodeProto.delete(BASE_DIR_NAME, true);
long end = System.currentTimeMillis();
return end-start;
}
@@ -522,7 +525,7 @@ public class NNThroughputBenchmark {
void generateInputs(int[] opsPerThread) throws IOException {
assert opsPerThread.length == numThreads : "Error opsPerThread.length";
- nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+ nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
// int generatedFileIdx = 0;
LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
fileNames = new String[numThreads][];
@@ -554,12 +557,12 @@ public class NNThroughputBenchmark {
throws IOException {
long start = System.currentTimeMillis();
// dummyActionNoSynch(fileIdx);
- nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
+ nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
clientName, new EnumSetWritable<CreateFlag>(EnumSet
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
long end = System.currentTimeMillis();
for(boolean written = !closeUponCreate; !written;
- written = nameNode.complete(fileNames[daemonId][inputIdx],
+ written = nameNodeProto.complete(fileNames[daemonId][inputIdx],
clientName, null));
return end-start;
}
@@ -627,11 +630,11 @@ public class NNThroughputBenchmark {
}
// use the same files for open
super.generateInputs(opsPerThread);
- if(nameNode.getFileInfo(opCreate.getBaseDir()) != null
- && nameNode.getFileInfo(getBaseDir()) == null) {
- nameNode.rename(opCreate.getBaseDir(), getBaseDir());
+ if(nameNodeProto.getFileInfo(opCreate.getBaseDir()) != null
+ && nameNodeProto.getFileInfo(getBaseDir()) == null) {
+ nameNodeProto.rename(opCreate.getBaseDir(), getBaseDir());
}
- if(nameNode.getFileInfo(getBaseDir()) == null) {
+ if(nameNodeProto.getFileInfo(getBaseDir()) == null) {
throw new IOException(getBaseDir() + " does not exist.");
}
}
@@ -642,7 +645,7 @@ public class NNThroughputBenchmark {
long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
long start = System.currentTimeMillis();
- nameNode.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
+ nameNodeProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
long end = System.currentTimeMillis();
return end-start;
}
@@ -670,7 +673,7 @@ public class NNThroughputBenchmark {
long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
long start = System.currentTimeMillis();
- nameNode.delete(fileNames[daemonId][inputIdx], false);
+ nameNodeProto.delete(fileNames[daemonId][inputIdx], false);
long end = System.currentTimeMillis();
return end-start;
}
@@ -698,7 +701,7 @@ public class NNThroughputBenchmark {
long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
long start = System.currentTimeMillis();
- nameNode.getFileInfo(fileNames[daemonId][inputIdx]);
+ nameNodeProto.getFileInfo(fileNames[daemonId][inputIdx]);
long end = System.currentTimeMillis();
return end-start;
}
@@ -740,7 +743,7 @@ public class NNThroughputBenchmark {
long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
long start = System.currentTimeMillis();
- nameNode.rename(fileNames[daemonId][inputIdx],
+ nameNodeProto.rename(fileNames[daemonId][inputIdx],
destNames[daemonId][inputIdx]);
long end = System.currentTimeMillis();
return end-start;
@@ -787,11 +790,11 @@ public class NNThroughputBenchmark {
void register() throws IOException {
// get versions from the namenode
- nsInfo = nameNode.versionRequest();
+ nsInfo = nameNodeProto.versionRequest();
dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
DataNode.setNewStorageID(dnRegistration);
// register datanode
- dnRegistration = nameNode.registerDatanode(dnRegistration);
+ dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
}
/**
@@ -801,7 +804,7 @@ public class NNThroughputBenchmark {
void sendHeartbeat() throws IOException {
// register datanode
// TODO:FEDERATION currently a single block pool is supported
- DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration,
+ DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0);
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
@@ -846,7 +849,7 @@ public class NNThroughputBenchmark {
int replicateBlocks() throws IOException {
// register datanode
// TODO:FEDERATION currently a single block pool is supported
- DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration,
+ DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0);
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
@@ -877,7 +880,7 @@ public class NNThroughputBenchmark {
receivedDNReg.setStorageInfo(
new DataStorage(nsInfo, dnInfo.getStorageID()));
receivedDNReg.setInfoPort(dnInfo.getInfoPort());
- nameNode.blockReceived( receivedDNReg,
+ nameNodeProto.blockReceived( receivedDNReg,
nameNode.getNamesystem().getBlockPoolId(),
new Block[] {blocks[i]},
new String[] {DataNode.EMPTY_DEL_HINT});
@@ -968,14 +971,14 @@ public class NNThroughputBenchmark {
FileNameGenerator nameGenerator;
nameGenerator = new FileNameGenerator(getBaseDir(), 100);
String clientName = getClientName(007);
- nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+ nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
for(int idx=0; idx < nrFiles; idx++) {
String fileName = nameGenerator.getNextFileName("ThroughputBench");
- nameNode.create(fileName, FsPermission.getDefault(), clientName,
+ nameNodeProto.create(fileName, FsPermission.getDefault(), clientName,
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
BLOCK_SIZE);
ExtendedBlock lastBlock = addBlocks(fileName, clientName);
- nameNode.complete(fileName, clientName, lastBlock);
+ nameNodeProto.complete(fileName, clientName, lastBlock);
}
// prepare block reports
for(int idx=0; idx < nrDatanodes; idx++) {
@@ -987,12 +990,12 @@ public class NNThroughputBenchmark {
throws IOException {
ExtendedBlock prevBlock = null;
for(int jdx = 0; jdx < blocksPerFile; jdx++) {
- LocatedBlock loc = nameNode.addBlock(fileName, clientName, prevBlock, null);
+ LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, prevBlock, null);
prevBlock = loc.getBlock();
for(DatanodeInfo dnInfo : loc.getLocations()) {
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
- nameNode.blockReceived(
+ nameNodeProto.blockReceived(
datanodes[dnIdx].dnRegistration,
loc.getBlock().getBlockPoolId(),
new Block[] {loc.getBlock().getLocalBlock()},
@@ -1013,7 +1016,7 @@ public class NNThroughputBenchmark {
assert daemonId < numThreads : "Wrong daemonId.";
TinyDatanode dn = datanodes[daemonId];
long start = System.currentTimeMillis();
- nameNode.blockReport(dn.dnRegistration, nameNode.getNamesystem()
+ nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
.getBlockPoolId(), dn.getBlockReportList());
long end = System.currentTimeMillis();
return end-start;
@@ -1146,7 +1149,7 @@ public class NNThroughputBenchmark {
LOG.info("Datanode " + dn.getName() + " is decommissioned.");
}
excludeFile.close();
- nameNode.refreshNodes();
+ nameNodeProto.refreshNodes();
}
/**
@@ -1160,8 +1163,8 @@ public class NNThroughputBenchmark {
assert daemonId < numThreads : "Wrong daemonId.";
long start = System.currentTimeMillis();
// compute data-node work
- int work = BlockManagerTestUtil.getComputedDatanodeWork(nameNode
- .getNamesystem().getBlockManager());
+ int work = BlockManagerTestUtil.getComputedDatanodeWork(
+ nameNode.getNamesystem().getBlockManager());
long end = System.currentTimeMillis();
numPendingBlocks += work;
if(work == 0)
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Mon Sep 12 18:59:42 2011
@@ -47,14 +47,6 @@ public class NameNodeAdapter {
src, offset, length, false, true);
}
- /**
- * Get the internal RPC server instance.
- * @return rpc server
- */
- public static Server getRpcServer(NameNode namenode) {
- return namenode.server;
- }
-
public static DelegationTokenSecretManager getDtSecretManager(
final FSNamesystem ns) {
return ns.getDelegationTokenSecretManager();
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java?rev=1169868&r1=1169867&r2=1169868&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java Mon Sep 12 18:59:42 2011
@@ -239,10 +239,10 @@ public class OfflineEditsViewerHelper {
LOG.info("Innocuous exception", e);
}
locatedBlocks = DFSClientAdapter.callGetBlockLocations(
- cluster.getNameNode(), filePath, 0L, bytes.length);
+ cluster.getNameNodeRpc(), filePath, 0L, bytes.length);
} while (locatedBlocks.isUnderConstruction());
// Force a roll so we get an OP_END_LOG_SEGMENT txn
- return cluster.getNameNode().rollEditLog();
+ return cluster.getNameNodeRpc().rollEditLog();
}
}
|