Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1FF86200CE6 for ; Thu, 17 Aug 2017 02:11:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1E2F0169D85; Thu, 17 Aug 2017 00:11:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A0363169D7E for ; Thu, 17 Aug 2017 02:11:49 +0200 (CEST) Received: (qmail 80244 invoked by uid 500); 17 Aug 2017 00:11:47 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 79297 invoked by uid 99); 17 Aug 2017 00:11:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Aug 2017 00:11:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9821CF565D; Thu, 17 Aug 2017 00:11:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Thu, 17 Aug 2017 00:11:54 -0000 Message-Id: <98ef9e765c0e4f899bb15c887831dc4b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/24] hadoop git commit: HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri. archived-at: Thu, 17 Aug 2017 00:11:52 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/8854913c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 24792bb..4bae71e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -17,16 +17,109 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_READER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_READER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY; + +import java.io.FileNotFoundException; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.fs.CacheFlag; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.AddBlockFlag; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.inotify.EventBatchList; +import org.apache.hadoop.hdfs.protocol.AddingECPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; +import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; +import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingService; /** * This class is responsible for handling all of the RPC calls to the It is @@ -36,10 +129,42 @@ import com.google.common.annotations.VisibleForTesting; * the requests to the active * {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}. */ -public class RouterRpcServer extends AbstractService { +public class RouterRpcServer extends AbstractService implements ClientProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterRpcServer.class); + + + /** Configuration for the RPC server. */ + private Configuration conf; + + /** Identifier for the super user. */ + private final String superUser; + /** Identifier for the super group. */ + private final String superGroup; + + /** Router using this RPC server. */ + private final Router router; /** The RPC server that listens to requests from clients. */ private final Server rpcServer; + /** The address for this RPC server. */ + private final InetSocketAddress rpcAddress; + + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + + + /** Interface to identify the active NN for a nameservice or blockpool ID. */ + private final ActiveNamenodeResolver namenodeResolver; + + /** Interface to map global name space to HDFS subcluster name spaces. */ + private final FileSubclusterResolver subclusterResolver; + + + /** Category of the operation that a thread is executing. */ + private final ThreadLocal opCategory = new ThreadLocal<>(); + /** * Construct a router RPC server. @@ -54,31 +179,94 @@ public class RouterRpcServer extends AbstractService { throws IOException { super(RouterRpcServer.class.getName()); - this.rpcServer = null; - } + this.conf = configuration; + this.router = router; + this.namenodeResolver = nnResolver; + this.subclusterResolver = fileResolver; - /** - * Allow access to the client RPC server for testing. - * - * @return The RPC server. - */ - @VisibleForTesting - public Server getServer() { - return this.rpcServer; + // User and group for reporting + this.superUser = System.getProperty("user.name"); + this.superGroup = this.conf.get( + DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, + DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); + + // RPC server settings + int handlerCount = this.conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, + DFS_ROUTER_HANDLER_COUNT_DEFAULT); + + int readerCount = this.conf.getInt(DFS_ROUTER_READER_COUNT_KEY, + DFS_ROUTER_READER_COUNT_DEFAULT); + + int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY, + DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT); + + // Override Hadoop Common IPC setting + int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY, + DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT); + this.conf.setInt( + CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, + readerQueueSize); + + RPC.setProtocolEngine(this.conf, ClientNamenodeProtocolPB.class, + ProtobufRpcEngine.class); + + ClientNamenodeProtocolServerSideTranslatorPB + clientProtocolServerTranslator = + new ClientNamenodeProtocolServerSideTranslatorPB(this); + BlockingService clientNNPbService = ClientNamenodeProtocol + .newReflectiveBlockingService(clientProtocolServerTranslator); + + InetSocketAddress confRpcAddress = conf.getSocketAddr( + DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, + DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, + DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_DEFAULT, + DFSConfigKeys.DFS_ROUTER_RPC_PORT_DEFAULT); + LOG.info("RPC server binding to {} with {} handlers for Router {}", + confRpcAddress, handlerCount, this.router.getRouterId()); + + this.rpcServer = new RPC.Builder(this.conf) + .setProtocol(ClientNamenodeProtocolPB.class) + .setInstance(clientNNPbService) + .setBindAddress(confRpcAddress.getHostName()) + .setPort(confRpcAddress.getPort()) + .setNumHandlers(handlerCount) + .setnumReaders(readerCount) + .setQueueSizePerHandler(handlerQueueSize) + .setVerbose(false) + .build(); + // We don't want the server to log the full stack trace for some exceptions + this.rpcServer.addTerseExceptions( + RemoteException.class, + StandbyException.class, + SafeModeException.class, + FileNotFoundException.class, + FileAlreadyExistsException.class, + AccessControlException.class, + LeaseExpiredException.class, + NotReplicatedYetException.class, + IOException.class); + + // The RPC-server port can be ephemeral... ensure we have the correct info + InetSocketAddress listenAddress = this.rpcServer.getListenerAddress(); + this.rpcAddress = new InetSocketAddress( + confRpcAddress.getHostName(), listenAddress.getPort()); + + // Create the client + this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(), + this.namenodeResolver); } @Override protected void serviceInit(Configuration configuration) throws Exception { + this.conf = configuration; super.serviceInit(configuration); } - /** - * Start client and service RPC servers. - */ @Override protected void serviceStart() throws Exception { if (this.rpcServer != null) { this.rpcServer.start(); + LOG.info("Router RPC up at: {}", this.getRpcAddress()); } super.serviceStart(); } @@ -92,11 +280,1652 @@ public class RouterRpcServer extends AbstractService { } /** - * Wait until the RPC servers have shutdown. + * Get the RPC client to the Namenode. + * + * @return RPC clients to the Namenodes. */ - void join() throws InterruptedException { - if (this.rpcServer != null) { - this.rpcServer.join(); + public RouterRpcClient getRPCClient() { + return rpcClient; + } + + /** + * Allow access to the client RPC server for testing. + * + * @return The RPC server. + */ + @VisibleForTesting + public Server getServer() { + return rpcServer; + } + + /** + * Get the RPC address of the service. + * + * @return RPC service address. + */ + public InetSocketAddress getRpcAddress() { + return rpcAddress; + } + + /** + * Check if the Router is in safe mode. We should only see READ, WRITE, and + * UNCHECKED. It includes a default handler when we haven't implemented an + * operation. If not supported, it always throws an exception reporting the + * operation. + * + * @param op Category of the operation to check. + * @param supported If the operation is supported or not. If not, it will + * throw an UnsupportedOperationException. + * @throws StandbyException If the Router is in safe mode and cannot serve + * client requests. + * @throws UnsupportedOperationException If the operation is not supported. + */ + private void checkOperation(OperationCategory op, boolean supported) + throws StandbyException, UnsupportedOperationException { + checkOperation(op); + + if (!supported) { + String methodName = getMethodName(); + throw new UnsupportedOperationException( + "Operation \"" + methodName + "\" is not supported"); + } + } + + /** + * Check if the Router is in safe mode. We should only see READ, WRITE, and + * UNCHECKED. This function should be called by all ClientProtocol functions. + * + * @param op Category of the operation to check. + * @throws StandbyException If the Router is in safe mode and cannot serve + * client requests. + */ + private void checkOperation(OperationCategory op) throws StandbyException { + // Log the function we are currently calling. + if (LOG.isDebugEnabled()) { + String methodName = getMethodName(); + LOG.debug("Proxying operation: {}", methodName); + } + + // Store the category of the operation category for this thread + opCategory.set(op); + + // We allow unchecked and read operations + if (op == OperationCategory.UNCHECKED || op == OperationCategory.READ) { + return; + } + + // TODO check Router safe mode and return Standby exception + } + + @Override // ClientProtocol + public Token getDelegationToken(Text renewer) + throws IOException { + checkOperation(OperationCategory.WRITE, false); + return null; + } + + /** + * The the delegation token from each name service. + * @param renewer + * @return Name service -> Token. + * @throws IOException + */ + public Map> + getDelegationTokens(Text renewer) throws IOException { + checkOperation(OperationCategory.WRITE, false); + return null; + } + + @Override // ClientProtocol + public long renewDelegationToken(Token token) + throws IOException { + checkOperation(OperationCategory.WRITE, false); + return 0; + } + + @Override // ClientProtocol + public void cancelDelegationToken(Token token) + throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override // ClientProtocol + public LocatedBlocks getBlockLocations(String src, final long offset, + final long length) throws IOException { + checkOperation(OperationCategory.READ); + + List locations = getLocationsForPath(src, false); + RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations", + new Class[] {String.class, long.class, long.class}, + new RemoteParam(), offset, length); + return (LocatedBlocks) rpcClient.invokeSequential(locations, remoteMethod, + LocatedBlocks.class, null); + } + + @Override // ClientProtocol + public FsServerDefaults getServerDefaults() throws IOException { + checkOperation(OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getServerDefaults"); + String ns = subclusterResolver.getDefaultNamespace(); + return (FsServerDefaults) rpcClient.invokeSingle(ns, method); + } + + @Override // ClientProtocol + public HdfsFileStatus create(String src, FsPermission masked, + String clientName, EnumSetWritable flag, + boolean createParent, short replication, long blockSize, + CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + throws IOException { + checkOperation(OperationCategory.WRITE); + + RemoteLocation createLocation = getCreateLocation(src); + RemoteMethod method = new RemoteMethod("create", + new Class[] {String.class, FsPermission.class, String.class, + EnumSetWritable.class, boolean.class, short.class, + long.class, CryptoProtocolVersion[].class, + String.class}, + createLocation.getDest(), masked, clientName, flag, createParent, + replication, blockSize, supportedVersions, ecPolicyName); + return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); + } + + /** + * Get the location to create a file. It checks if the file already existed + * in one of the locations. + * + * @param src Path of the file to check. + * @return The remote location for this file. + * @throws IOException If the file has no creation location. + */ + private RemoteLocation getCreateLocation(final String src) + throws IOException { + + final List locations = getLocationsForPath(src, true); + if (locations == null || locations.isEmpty()) { + throw new IOException("Cannot get locations to create " + src); + } + + RemoteLocation createLocation = locations.get(0); + if (locations.size() > 1) { + try { + // Check if this file already exists in other subclusters + LocatedBlocks existingLocation = getBlockLocations(src, 0, 1); + if (existingLocation != null) { + // Forward to the existing location and let the NN handle the error + LocatedBlock existingLocationLastLocatedBlock = + existingLocation.getLastLocatedBlock(); + if (existingLocationLastLocatedBlock == null) { + // The block has no blocks yet, check for the meta data + for (RemoteLocation location : locations) { + RemoteMethod method = new RemoteMethod("getFileInfo", + new Class[] {String.class}, new RemoteParam()); + if (rpcClient.invokeSingle(location, method) != null) { + createLocation = location; + break; + } + } + } else { + ExtendedBlock existingLocationLastBlock = + existingLocationLastLocatedBlock.getBlock(); + String blockPoolId = existingLocationLastBlock.getBlockPoolId(); + createLocation = getLocationForPath(src, true, blockPoolId); + } + } + } catch (FileNotFoundException fne) { + // Ignore if the file is not found + } + } + return createLocation; + } + + // Medium + @Override // ClientProtocol + public LastBlockWithStatus append(String src, final String clientName, + final EnumSetWritable flag) throws IOException { + checkOperation(OperationCategory.WRITE); + + List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("append", + new Class[] {String.class, String.class, EnumSetWritable.class}, + new RemoteParam(), clientName, flag); + return (LastBlockWithStatus) rpcClient.invokeSequential( + locations, method, LastBlockWithStatus.class, null); + } + + // Low + @Override // ClientProtocol + public boolean recoverLease(String src, String clientName) + throws IOException { + checkOperation(OperationCategory.WRITE); + + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("recoverLease", + new Class[] {String.class, String.class}, new RemoteParam(), + clientName); + Object result = rpcClient.invokeSequential( + locations, method, Boolean.class, Boolean.TRUE); + return (boolean) result; + } + + @Override // ClientProtocol + public boolean setReplication(String src, short replication) + throws IOException { + checkOperation(OperationCategory.WRITE); + + List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setReplication", + new Class[] {String.class, short.class}, new RemoteParam(), + replication); + Object result = rpcClient.invokeSequential( + locations, method, Boolean.class, Boolean.TRUE); + return (boolean) result; + } + + @Override + public void setStoragePolicy(String src, String policyName) + throws IOException { + checkOperation(OperationCategory.WRITE); + + List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setStoragePolicy", + new Class[] {String.class, String.class}, + new RemoteParam(), policyName); + rpcClient.invokeSequential(locations, method, null, null); + } + + @Override + public BlockStoragePolicy[] getStoragePolicies() throws IOException { + checkOperation(OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getStoragePolicies"); + String ns = subclusterResolver.getDefaultNamespace(); + return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method); + } + + @Override // ClientProtocol + public void setPermission(String src, FsPermission permissions) + throws IOException { + checkOperation(OperationCategory.WRITE); + + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setPermission", + new Class[] {String.class, FsPermission.class}, + new RemoteParam(), permissions); + rpcClient.invokeSequential(locations, method); + } + + @Override // ClientProtocol + public void setOwner(String src, String username, String groupname) + throws IOException { + checkOperation(OperationCategory.WRITE); + + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setOwner", + new Class[] {String.class, String.class, String.class}, + new RemoteParam(), username, groupname); + rpcClient.invokeSequential(locations, method); + } + + /** + * Excluded and favored nodes are not verified and will be ignored by + * placement policy if they are not in the same nameservice as the file. + */ + @Override // ClientProtocol + public LocatedBlock addBlock(String src, String clientName, + ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, + String[] favoredNodes, EnumSet addBlockFlags) + throws IOException { + checkOperation(OperationCategory.WRITE); + + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("addBlock", + new Class[] {String.class, String.class, ExtendedBlock.class, + DatanodeInfo[].class, long.class, String[].class, + EnumSet.class}, + new RemoteParam(), clientName, previous, excludedNodes, fileId, + favoredNodes, addBlockFlags); + // TODO verify the excludedNodes and favoredNodes are acceptable to this NN + return (LocatedBlock) rpcClient.invokeSequential( + locations, method, LocatedBlock.class, null); + } + + /** + * Excluded nodes are not verified and will be ignored by placement if they + * are not in the same nameservice as the file. + */ + @Override // ClientProtocol + public LocatedBlock getAdditionalDatanode(final String src, final long fileId, + final ExtendedBlock blk, final DatanodeInfo[] existings, + final String[] existingStorageIDs, final DatanodeInfo[] excludes, + final int numAdditionalNodes, final String clientName) + throws IOException { + checkOperation(OperationCategory.READ); + + final List locations = getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getAdditionalDatanode", + new Class[] {String.class, long.class, ExtendedBlock.class, + DatanodeInfo[].class, String[].class, + DatanodeInfo[].class, int.class, String.class}, + new RemoteParam(), fileId, blk, existings, existingStorageIDs, excludes, + numAdditionalNodes, clientName); + return (LocatedBlock) rpcClient.invokeSequential( + locations, method, LocatedBlock.class, null); + } + + @Override // ClientProtocol + public void abandonBlock(ExtendedBlock b, long fileId, String src, + String holder) throws IOException { + checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("abandonBlock", + new Class[] {ExtendedBlock.class, long.class, String.class, + String.class}, + b, fileId, new RemoteParam(), holder); + rpcClient.invokeSingle(b, method); + } + + @Override // ClientProtocol + public boolean complete(String src, String clientName, ExtendedBlock last, + long fileId) throws IOException { + checkOperation(OperationCategory.WRITE); + + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("complete", + new Class[] {String.class, String.class, ExtendedBlock.class, + long.class}, + new RemoteParam(), clientName, last, fileId); + // Complete can return true/false, so don't expect a result + return ((Boolean) rpcClient.invokeSequential( + locations, method, Boolean.class, null)).booleanValue(); + } + + @Override // ClientProtocol + public LocatedBlock updateBlockForPipeline( + ExtendedBlock block, String clientName) throws IOException { + checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("updateBlockForPipeline", + new Class[] {ExtendedBlock.class, String.class}, + block, clientName); + return (LocatedBlock) rpcClient.invokeSingle(block, method); + } + + /** + * Datanode are not verified to be in the same nameservice as the old block. + * TODO This may require validation. + */ + @Override // ClientProtocol + public void updatePipeline(String clientName, ExtendedBlock oldBlock, + ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs) + throws IOException { + checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("updatePipeline", + new Class[] {String.class, ExtendedBlock.class, ExtendedBlock.class, + DatanodeID[].class, String[].class}, + clientName, oldBlock, newBlock, newNodes, newStorageIDs); + rpcClient.invokeSingle(oldBlock, method); + } + + @Override // ClientProtocol + public long getPreferredBlockSize(String src) throws IOException { + checkOperation(OperationCategory.READ); + + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("getPreferredBlockSize", + new Class[] {String.class}, new RemoteParam()); + return ((Long) rpcClient.invokeSequential( + locations, method, Long.class, null)).longValue(); + } + + /** + * Determines combinations of eligible src/dst locations for a rename. A + * rename cannot change the namespace. Renames are only allowed if there is an + * eligible dst location in the same namespace as the source. + * + * @param srcLocations List of all potential source destinations where the + * path may be located. On return this list is trimmed to include + * only the paths that have corresponding destinations in the same + * namespace. + * @param dst The destination path + * @return A map of all eligible source namespaces and their corresponding + * replacement value. + * @throws IOException If the dst paths could not be determined. + */ + private RemoteParam getRenameDestinations( + final List srcLocations, final String dst) + throws IOException { + + final List dstLocations = getLocationsForPath(dst, true); + final Map dstMap = new HashMap<>(); + + Iterator iterator = srcLocations.iterator(); + while (iterator.hasNext()) { + RemoteLocation srcLocation = iterator.next(); + RemoteLocation eligibleDst = + getFirstMatchingLocation(srcLocation, dstLocations); + if (eligibleDst != null) { + // Use this dst for this source location + dstMap.put(srcLocation, eligibleDst.getDest()); + } else { + // This src destination is not valid, remove from the source list + iterator.remove(); + } + } + return new RemoteParam(dstMap); + } + + /** + * Get first matching location. + * + * @param location Location we are looking for. + * @param locations List of locations. + * @return The first matchin location in the list. + */ + private RemoteLocation getFirstMatchingLocation(RemoteLocation location, + List locations) { + for (RemoteLocation loc : locations) { + if (loc.getNameserviceId().equals(location.getNameserviceId())) { + // Return first matching location + return loc; + } + } + return null; + } + + @Deprecated + @Override // ClientProtocol + public boolean rename(final String src, final String dst) + throws IOException { + checkOperation(OperationCategory.WRITE); + + final List srcLocations = getLocationsForPath(src, true); + // srcLocations may be trimmed by getRenameDestinations() + final List locs = new LinkedList<>(srcLocations); + RemoteParam dstParam = getRenameDestinations(locs, dst); + if (locs.isEmpty()) { + throw new IOException( + "Rename of " + src + " to " + dst + " is not allowed," + + " no eligible destination in the same namespace was found."); + } + RemoteMethod method = new RemoteMethod("rename", + new Class[] {String.class, String.class}, + new RemoteParam(), dstParam); + return ((Boolean) rpcClient.invokeSequential( + locs, method, Boolean.class, Boolean.TRUE)).booleanValue(); + } + + @Override // ClientProtocol + public void rename2(final String src, final String dst, + final Options.Rename... options) throws IOException { + checkOperation(OperationCategory.WRITE); + + final List srcLocations = getLocationsForPath(src, true); + // srcLocations may be trimmed by getRenameDestinations() + final List locs = new LinkedList<>(srcLocations); + RemoteParam dstParam = getRenameDestinations(locs, dst); + if (locs.isEmpty()) { + throw new IOException( + "Rename of " + src + " to " + dst + " is not allowed," + + " no eligible destination in the same namespace was found."); + } + RemoteMethod method = new RemoteMethod("rename2", + new Class[] {String.class, String.class, options.getClass()}, + new RemoteParam(), dstParam, options); + rpcClient.invokeSequential(locs, method, null, null); + } + + @Override // ClientProtocol + public void concat(String trg, String[] src) throws IOException { + checkOperation(OperationCategory.WRITE); + + // See if the src and target files are all in the same namespace + LocatedBlocks targetBlocks = getBlockLocations(trg, 0, 1); + if (targetBlocks == null) { + throw new IOException("Cannot locate blocks for target file - " + trg); + } + LocatedBlock lastLocatedBlock = targetBlocks.getLastLocatedBlock(); + String targetBlockPoolId = lastLocatedBlock.getBlock().getBlockPoolId(); + for (String source : src) { + LocatedBlocks sourceBlocks = getBlockLocations(source, 0, 1); + if (sourceBlocks == null) { + throw new IOException( + "Cannot located blocks for source file " + source); + } + String sourceBlockPoolId = + sourceBlocks.getLastLocatedBlock().getBlock().getBlockPoolId(); + if (!sourceBlockPoolId.equals(targetBlockPoolId)) { + throw new IOException("Cannot concatenate source file " + source + + " because it is located in a different namespace" + + " with block pool id " + sourceBlockPoolId + + " from the target file with block pool id " + + targetBlockPoolId); + } + } + + // Find locations in the matching namespace. + final RemoteLocation targetDestination = + getLocationForPath(trg, true, targetBlockPoolId); + String[] sourceDestinations = new String[src.length]; + for (int i = 0; i < src.length; i++) { + String sourceFile = src[i]; + RemoteLocation location = + getLocationForPath(sourceFile, true, targetBlockPoolId); + sourceDestinations[i] = location.getDest(); + } + // Invoke + RemoteMethod method = new RemoteMethod("concat", + new Class[] {String.class, String[].class}, + targetDestination.getDest(), sourceDestinations); + rpcClient.invokeSingle(targetDestination, method); + } + + @Override // ClientProtocol + public boolean truncate(String src, long newLength, String clientName) + throws IOException { + checkOperation(OperationCategory.WRITE); + + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("truncate", + new Class[] {String.class, long.class, String.class}, + new RemoteParam(), newLength, clientName); + return ((Boolean) rpcClient.invokeSequential(locations, method, + Boolean.class, Boolean.TRUE)).booleanValue(); + } + + @Override // ClientProtocol + public boolean delete(String src, boolean recursive) throws IOException { + checkOperation(OperationCategory.WRITE); + + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("delete", + new Class[] {String.class, boolean.class}, new RemoteParam(), + recursive); + return ((Boolean) rpcClient.invokeSequential(locations, method, + Boolean.class, Boolean.TRUE)).booleanValue(); + } + + @Override // ClientProtocol + public boolean mkdirs(String src, FsPermission masked, boolean createParent) + throws IOException { + checkOperation(OperationCategory.WRITE); + + final List locations = getLocationsForPath(src, true); + if (locations.size() > 1) { + // Check if this directory already exists + try { + HdfsFileStatus fileStatus = getFileInfo(src); + if (fileStatus != null) { + // When existing, the NN doesn't return an exception; return true + return true; + } + } catch (IOException ioe) { + // Can't query if this file exists or not. + LOG.error("Error requesting file info for path {} while proxing mkdirs", + src, ioe); + } + } + + RemoteLocation firstLocation = locations.get(0); + RemoteMethod method = new RemoteMethod("mkdirs", + new Class[] {String.class, FsPermission.class, boolean.class}, + new RemoteParam(), masked, createParent); + return ((Boolean) rpcClient.invokeSingle(firstLocation, method)) + .booleanValue(); + } + + @Override // ClientProtocol + public void renewLease(String clientName) throws IOException { + checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("renewLease", + new Class[] {String.class}, clientName); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, false, false); + } + + @Override // ClientProtocol + public DirectoryListing getListing(String src, byte[] startAfter, + boolean needLocation) throws IOException { + checkOperation(OperationCategory.READ); + + // Locate the dir and fetch the listing + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("getListing", + new Class[] {String.class, startAfter.getClass(), boolean.class}, + new RemoteParam(), startAfter, needLocation); + Map listings = + rpcClient.invokeConcurrent(locations, method, false, false); + + Map nnListing = new TreeMap<>(); + int totalRemainingEntries = 0; + int remainingEntries = 0; + boolean namenodeListingExists = false; + if (listings != null) { + // Check the subcluster listing with the smallest name + String lastName = null; + for (Entry entry : listings.entrySet()) { + RemoteLocation location = entry.getKey(); + DirectoryListing listing = (DirectoryListing) entry.getValue(); + if (listing == null) { + LOG.debug("Cannot get listing from {}", location); + } else { + totalRemainingEntries += listing.getRemainingEntries(); + HdfsFileStatus[] partialListing = listing.getPartialListing(); + int length = partialListing.length; + if (length > 0) { + HdfsFileStatus lastLocalEntry = partialListing[length-1]; + String lastLocalName = lastLocalEntry.getLocalName(); + if (lastName == null || lastName.compareTo(lastLocalName) > 0) { + lastName = lastLocalName; + } + } + } + } + + // Add existing entries + for (Object value : listings.values()) { + DirectoryListing listing = (DirectoryListing) value; + if (listing != null) { + namenodeListingExists = true; + for (HdfsFileStatus file : listing.getPartialListing()) { + String filename = file.getLocalName(); + if (totalRemainingEntries > 0 && filename.compareTo(lastName) > 0) { + // Discarding entries further than the lastName + remainingEntries++; + } else { + nnListing.put(filename, file); + } + } + remainingEntries += listing.getRemainingEntries(); + } + } + } + + // Add mount points at this level in the tree + final List children = subclusterResolver.getMountPoints(src); + if (children != null) { + // Get the dates for each mount point + Map dates = getMountPointDates(src); + + // Create virtual folder with the mount name + for (String child : children) { + long date = 0; + if (dates != null && dates.containsKey(child)) { + date = dates.get(child); + } + // TODO add number of children + HdfsFileStatus dirStatus = getMountPointStatus(child, 0, date); + + // This may overwrite existing listing entries with the mount point + // TODO don't add if already there? + nnListing.put(child, dirStatus); + } + } + + if (!namenodeListingExists && nnListing.size() == 0) { + // NN returns a null object if the directory cannot be found and has no + // listing. If we didn't retrieve any NN listing data, and there are no + // mount points here, return null. + return null; + } + + // Generate combined listing + HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()]; + combinedData = nnListing.values().toArray(combinedData); + return new DirectoryListing(combinedData, remainingEntries); + } + + @Override // ClientProtocol + public HdfsFileStatus getFileInfo(String src) throws IOException { + checkOperation(OperationCategory.READ); + + final List locations = getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getFileInfo", + new Class[] {String.class}, new RemoteParam()); + HdfsFileStatus ret = (HdfsFileStatus) rpcClient.invokeSequential( + locations, method, HdfsFileStatus.class, null); + + // If there is no real path, check mount points + if (ret == null) { + List children = subclusterResolver.getMountPoints(src); + if (children != null && !children.isEmpty()) { + Map dates = getMountPointDates(src); + long date = 0; + if (dates != null && dates.containsKey(src)) { + date = dates.get(src); + } + ret = getMountPointStatus(src, children.size(), date); + } + } + + return ret; + } + + @Override // ClientProtocol + public boolean isFileClosed(String src) throws IOException { + checkOperation(OperationCategory.READ); + + final List locations = getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("isFileClosed", + new Class[] {String.class}, new RemoteParam()); + return ((Boolean) rpcClient.invokeSequential( + locations, method, Boolean.class, Boolean.TRUE)).booleanValue(); + } + + @Override // ClientProtocol + public HdfsFileStatus getFileLinkInfo(String src) throws IOException { + checkOperation(OperationCategory.READ); + + final List locations = getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getFileLinkInfo", + new Class[] {String.class}, new RemoteParam()); + return (HdfsFileStatus) rpcClient.invokeSequential( + locations, method, HdfsFileStatus.class, null); + } + + @Override // ClientProtocol + public long[] getStats() throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("getStats"); + Set nss = namenodeResolver.getNamespaces(); + Map results = + rpcClient.invokeConcurrent(nss, method, true, false); + long[] combinedData = new long[STATS_ARRAY_LENGTH]; + for (Object o : results.values()) { + long[] data = (long[]) o; + for (int i = 0; i < combinedData.length && i < data.length; i++) { + if (data[i] >= 0) { + combinedData[i] += data[i]; + } + } + } + return combinedData; + } + + @Override // ClientProtocol + public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) + throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + Map datanodesMap = new LinkedHashMap<>(); + RemoteMethod method = new RemoteMethod("getDatanodeReport", + new Class[] {DatanodeReportType.class}, type); + + Set nss = namenodeResolver.getNamespaces(); + Map results = + rpcClient.invokeConcurrent(nss, method, true, false); + for (Entry entry : results.entrySet()) { + FederationNamespaceInfo ns = entry.getKey(); + DatanodeInfo[] result = (DatanodeInfo[]) entry.getValue(); + for (DatanodeInfo node : result) { + String nodeId = node.getXferAddr(); + if (!datanodesMap.containsKey(nodeId)) { + // Add the subcluster as a suffix to the network location + node.setNetworkLocation( + NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() + + node.getNetworkLocation()); + datanodesMap.put(nodeId, node); + } else { + LOG.debug("{} is in multiple subclusters", nodeId); + } + } + } + // Map -> Array + Collection datanodes = datanodesMap.values(); + DatanodeInfo[] combinedData = new DatanodeInfo[datanodes.size()]; + combinedData = datanodes.toArray(combinedData); + return combinedData; + } + + @Override // ClientProtocol + public DatanodeStorageReport[] getDatanodeStorageReport( + DatanodeReportType type) throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + Map datanodesMap = new HashMap<>(); + RemoteMethod method = new RemoteMethod("getDatanodeStorageReport", + new Class[] {DatanodeReportType.class}, type); + Set nss = namenodeResolver.getNamespaces(); + Map results = + rpcClient.invokeConcurrent(nss, method, true, false); + for (Object r : results.values()) { + DatanodeStorageReport[] result = (DatanodeStorageReport[]) r; + for (DatanodeStorageReport node : result) { + String nodeId = node.getDatanodeInfo().getXferAddr(); + if (!datanodesMap.containsKey(nodeId)) { + datanodesMap.put(nodeId, node); + } + // TODO merge somehow, right now it just takes the first one + } + } + + Collection datanodes = datanodesMap.values(); + // TODO sort somehow + DatanodeStorageReport[] combinedData = + new DatanodeStorageReport[datanodes.size()]; + combinedData = datanodes.toArray(combinedData); + return combinedData; + } + + @Override // ClientProtocol + public boolean setSafeMode(SafeModeAction action, boolean isChecked) + throws IOException { + checkOperation(OperationCategory.WRITE); + + // Set safe mode in all the name spaces + RemoteMethod method = new RemoteMethod("setSafeMode", + new Class[] {SafeModeAction.class, boolean.class}, + action, isChecked); + Set nss = namenodeResolver.getNamespaces(); + Map results = + rpcClient.invokeConcurrent(nss, method, true, true); + + // We only report true if all the name space are in safe mode + int numSafemode = 0; + for (Object result : results.values()) { + if (result instanceof Boolean) { + boolean safemode = (boolean) result; + if (safemode) { + numSafemode++; + } + } } + return numSafemode == results.size(); + } + + @Override // ClientProtocol + public boolean restoreFailedStorage(String arg) throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("restoreFailedStorage", + new Class[] {String.class}, arg); + final Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false); + + boolean success = true; + Object obj = ret; + @SuppressWarnings("unchecked") + Map results = + (Map)obj; + Collection sucesses = results.values(); + for (boolean s : sucesses) { + if (!s) { + success = false; + } + } + return success; + } + + @Override // ClientProtocol + public boolean saveNamespace(long timeWindow, long txGap) throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("saveNamespace", + new Class[] {Long.class, Long.class}, timeWindow, txGap); + final Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false); + + boolean success = true; + Object obj = ret; + @SuppressWarnings("unchecked") + Map results = + (Map)obj; + Collection sucesses = results.values(); + for (boolean s : sucesses) { + if (!s) { + success = false; + } + } + return success; + } + + @Override // ClientProtocol + public long rollEdits() throws IOException { + checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("rollEdits", new Class[] {}); + final Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false); + + // Return the maximum txid + long txid = 0; + Object obj = ret; + @SuppressWarnings("unchecked") + Map results = + (Map)obj; + Collection txids = results.values(); + for (long t : txids) { + if (t > txid) { + txid = t; + } + } + return txid; + } + + @Override // ClientProtocol + public void refreshNodes() throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("refreshNodes", new Class[] {}); + final Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, true); + } + + @Override // ClientProtocol + public void finalizeUpgrade() throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("finalizeUpgrade", + new Class[] {}); + final Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + @Override // ClientProtocol + public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) + throws IOException { + checkOperation(OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("rollingUpgrade", + new Class[] {RollingUpgradeAction.class}, action); + final Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false); + + // Return the first rolling upgrade info + RollingUpgradeInfo info = null; + Object obj = ret; + @SuppressWarnings("unchecked") + Map results = + (Map)obj; + Collection infos = results.values(); + for (RollingUpgradeInfo infoNs : infos) { + if (info == null && infoNs != null) { + info = infoNs; + } + } + return info; + } + + @Override // ClientProtocol + public void metaSave(String filename) throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("metaSave", + new Class[] {String.class}, filename); + final Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + @Override // ClientProtocol + public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) + throws IOException { + checkOperation(OperationCategory.READ); + + final List locations = getLocationsForPath(path, false); + RemoteMethod method = new RemoteMethod("listCorruptFileBlocks", + new Class[] {String.class, String.class}, + new RemoteParam(), cookie); + return (CorruptFileBlocks) rpcClient.invokeSequential( + locations, method, CorruptFileBlocks.class, null); + } + + @Override // ClientProtocol + public void setBalancerBandwidth(long bandwidth) throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("setBalancerBandwidth", + new Class[] {Long.class}, bandwidth); + final Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + @Override // ClientProtocol + public ContentSummary getContentSummary(String path) throws IOException { + checkOperation(OperationCategory.READ); + + // Get the summaries from regular files + Collection summaries = new LinkedList<>(); + FileNotFoundException notFoundException = null; + try { + final List locations = getLocationsForPath(path, false); + RemoteMethod method = new RemoteMethod("getContentSummary", + new Class[] {String.class}, new RemoteParam()); + @SuppressWarnings("unchecked") + Map results = + (Map) ((Object)rpcClient.invokeConcurrent( + locations, method, false, false)); + summaries.addAll(results.values()); + } catch (FileNotFoundException e) { + notFoundException = e; + } + + // Add mount points at this level in the tree + final List children = subclusterResolver.getMountPoints(path); + if (children != null) { + for (String child : children) { + Path childPath = new Path(path, child); + try { + ContentSummary mountSummary = getContentSummary(childPath.toString()); + if (mountSummary != null) { + summaries.add(mountSummary); + } + } catch (Exception e) { + LOG.error("Cannot get content summary for mount {}: {}", + childPath, e.getMessage()); + } + } + } + + // Throw original exception if no original nor mount points + if (summaries.isEmpty() && notFoundException != null) { + throw notFoundException; + } + + return aggregateContentSummary(summaries); + } + + /** + * Aggregate content summaries for each subcluster. + * + * @param results Collection of individual summaries. + * @return Aggregated content summary. + */ + private ContentSummary aggregateContentSummary( + Collection summaries) { + if (summaries.size() == 1) { + return summaries.iterator().next(); + } + + long length = 0; + long fileCount = 0; + long directoryCount = 0; + long quota = 0; + long spaceConsumed = 0; + long spaceQuota = 0; + + for (ContentSummary summary : summaries) { + length += summary.getLength(); + fileCount += summary.getFileCount(); + directoryCount += summary.getDirectoryCount(); + quota += summary.getQuota(); + spaceConsumed += summary.getSpaceConsumed(); + spaceQuota += summary.getSpaceQuota(); + } + + ContentSummary ret = new ContentSummary.Builder() + .length(length) + .fileCount(fileCount) + .directoryCount(directoryCount) + .quota(quota) + .spaceConsumed(spaceConsumed) + .spaceQuota(spaceQuota) + .build(); + return ret; + } + + @Override // ClientProtocol + public void fsync(String src, long fileId, String clientName, + long lastBlockLength) throws IOException { + checkOperation(OperationCategory.WRITE); + + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("fsync", + new Class[] {String.class, long.class, String.class, long.class }, + new RemoteParam(), fileId, clientName, lastBlockLength); + rpcClient.invokeSequential(locations, method); + } + + @Override // ClientProtocol + public void setTimes(String src, long mtime, long atime) throws IOException { + checkOperation(OperationCategory.WRITE); + + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setTimes", + new Class[] {String.class, long.class, long.class}, + new RemoteParam(), mtime, atime); + rpcClient.invokeSequential(locations, method); + } + + @Override // ClientProtocol + public void createSymlink(String target, String link, FsPermission dirPerms, + boolean createParent) throws IOException { + checkOperation(OperationCategory.WRITE); + + // TODO Verify that the link location is in the same NS as the targets + final List targetLocations = + getLocationsForPath(target, true); + final List linkLocations = + getLocationsForPath(link, true); + RemoteLocation linkLocation = linkLocations.get(0); + RemoteMethod method = new RemoteMethod("createSymlink", + new Class[] {String.class, String.class, FsPermission.class, + boolean.class}, + new RemoteParam(), linkLocation.getDest(), dirPerms, createParent); + rpcClient.invokeSequential(targetLocations, method); + } + + @Override // ClientProtocol + public String getLinkTarget(String path) throws IOException { + checkOperation(OperationCategory.READ); + + final List locations = getLocationsForPath(path, true); + RemoteMethod method = new RemoteMethod("getLinkTarget", + new Class[] {String.class}, new RemoteParam()); + return (String) rpcClient.invokeSequential( + locations, method, String.class, null); + } + + @Override // Client Protocol + public void allowSnapshot(String snapshotRoot) throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override // Client Protocol + public void disallowSnapshot(String snapshot) throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override // ClientProtocol + public void renameSnapshot(String snapshotRoot, String snapshotOldName, + String snapshotNewName) throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override // Client Protocol + public SnapshottableDirectoryStatus[] getSnapshottableDirListing() + throws IOException { + checkOperation(OperationCategory.READ, false); + return null; + } + + @Override // ClientProtocol + public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot, + String earlierSnapshotName, String laterSnapshotName) throws IOException { + checkOperation(OperationCategory.READ, false); + return null; + } + + @Override // ClientProtocol + public long addCacheDirective(CacheDirectiveInfo path, + EnumSet flags) throws IOException { + checkOperation(OperationCategory.WRITE, false); + return 0; + } + + @Override // ClientProtocol + public void modifyCacheDirective(CacheDirectiveInfo directive, + EnumSet flags) throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override // ClientProtocol + public void removeCacheDirective(long id) throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override // ClientProtocol + public BatchedEntries listCacheDirectives( + long prevId, CacheDirectiveInfo filter) throws IOException { + checkOperation(OperationCategory.READ, false); + return null; + } + + @Override // ClientProtocol + public void addCachePool(CachePoolInfo info) throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override // ClientProtocol + public void modifyCachePool(CachePoolInfo info) throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override // ClientProtocol + public void removeCachePool(String cachePoolName) throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override // ClientProtocol + public BatchedEntries listCachePools(String prevKey) + throws IOException { + checkOperation(OperationCategory.READ, false); + return null; + } + + @Override // ClientProtocol + public void modifyAclEntries(String src, List aclSpec) + throws IOException { + checkOperation(OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("modifyAclEntries", + new Class[] {String.class, List.class}, + new RemoteParam(), aclSpec); + rpcClient.invokeSequential(locations, method, null, null); + } + + @Override // ClienProtocol + public void removeAclEntries(String src, List aclSpec) + throws IOException { + checkOperation(OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("removeAclEntries", + new Class[] {String.class, List.class}, + new RemoteParam(), aclSpec); + rpcClient.invokeSequential(locations, method, null, null); + } + + @Override // ClientProtocol + public void removeDefaultAcl(String src) throws IOException { + checkOperation(OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("removeDefaultAcl", + new Class[] {String.class}, new RemoteParam()); + rpcClient.invokeSequential(locations, method); + } + + @Override // ClientProtocol + public void removeAcl(String src) throws IOException { + checkOperation(OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("removeAcl", + new Class[] {String.class}, new RemoteParam()); + rpcClient.invokeSequential(locations, method); + } + + @Override // ClientProtocol + public void setAcl(String src, List aclSpec) throws IOException { + checkOperation(OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod( + "setAcl", new Class[] {String.class, List.class}, + new RemoteParam(), aclSpec); + rpcClient.invokeSequential(locations, method); + } + + @Override // ClientProtocol + public AclStatus getAclStatus(String src) throws IOException { + checkOperation(OperationCategory.READ); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getAclStatus", + new Class[] {String.class}, new RemoteParam()); + return (AclStatus) rpcClient.invokeSequential( + locations, method, AclStatus.class, null); + } + + @Override // ClientProtocol + public void createEncryptionZone(String src, String keyName) + throws IOException { + checkOperation(OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("createEncryptionZone", + new Class[] {String.class, String.class}, + new RemoteParam(), keyName); + rpcClient.invokeSequential(locations, method); + } + + @Override // ClientProtocol + public EncryptionZone getEZForPath(String src) throws IOException { + checkOperation(OperationCategory.READ); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getEZForPath", + new Class[] {String.class}, new RemoteParam()); + return (EncryptionZone) rpcClient.invokeSequential( + locations, method, EncryptionZone.class, null); + } + + @Override // ClientProtocol + public BatchedEntries listEncryptionZones(long prevId) + throws IOException { + checkOperation(OperationCategory.READ, false); + return null; + } + + @Override // ClientProtocol + public void setXAttr(String src, XAttr xAttr, EnumSet flag) + throws IOException { + checkOperation(OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setXAttr", + new Class[] {String.class, XAttr.class, EnumSet.class}, + new RemoteParam(), xAttr, flag); + rpcClient.invokeSequential(locations, method); + } + + @SuppressWarnings("unchecked") + @Override // ClientProtocol + public List getXAttrs(String src, List xAttrs) + throws IOException { + checkOperation(OperationCategory.READ); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getXAttrs", + new Class[] {String.class, List.class}, new RemoteParam(), xAttrs); + return (List) rpcClient.invokeSequential( + locations, method, List.class, null); + } + + @SuppressWarnings("unchecked") + @Override // ClientProtocol + public List listXAttrs(String src) throws IOException { + checkOperation(OperationCategory.READ); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("listXAttrs", + new Class[] {String.class}, new RemoteParam()); + return (List) rpcClient.invokeSequential( + locations, method, List.class, null); + } + + @Override // ClientProtocol + public void removeXAttr(String src, XAttr xAttr) throws IOException { + checkOperation(OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("removeXAttr", + new Class[] {String.class, XAttr.class}, new RemoteParam(), xAttr); + rpcClient.invokeSequential(locations, method); + } + + @Override // ClientProtocol + public void checkAccess(String path, FsAction mode) throws IOException { + checkOperation(OperationCategory.READ); + + // TODO handle virtual directories + final List locations = getLocationsForPath(path, true); + RemoteMethod method = new RemoteMethod("checkAccess", + new Class[] {String.class, FsAction.class}, + new RemoteParam(), mode); + rpcClient.invokeSequential(locations, method); + } + + @Override // ClientProtocol + public long getCurrentEditLogTxid() throws IOException { + checkOperation(OperationCategory.READ); + + RemoteMethod method = new RemoteMethod( + "getCurrentEditLogTxid", new Class[] {}); + final Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false); + + // Return the maximum txid + long txid = 0; + Object obj = ret; + @SuppressWarnings("unchecked") + Map results = + (Map)obj; + Collection txids = results.values(); + for (long t : txids) { + if (t > txid) { + txid = t; + } + } + return txid; + } + + @Override // ClientProtocol + public EventBatchList getEditsFromTxid(long txid) throws IOException { + checkOperation(OperationCategory.READ, false); + return null; + } + + @Override + public DataEncryptionKey getDataEncryptionKey() throws IOException { + checkOperation(OperationCategory.READ, false); + return null; + } + + @Override + public String createSnapshot(String snapshotRoot, String snapshotName) + throws IOException { + checkOperation(OperationCategory.WRITE); + return null; + } + + @Override + public void deleteSnapshot(String snapshotRoot, String snapshotName) + throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override + public ErasureCodingPolicy[] getErasureCodingPolicies() throws IOException { + checkOperation(OperationCategory.READ, false); + return null; + } + + @Override // ClientProtocol + public ErasureCodingPolicy getErasureCodingPolicy(String src) + throws IOException { + checkOperation(OperationCategory.READ, false); + return null; + } + + @Override // ClientProtocol + public void setErasureCodingPolicy(String src, String ecPolicyName) + throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override // ClientProtocol + public AddingECPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException { + checkOperation(OperationCategory.WRITE, false); + return null; + } + + @Override // ClientProtocol + public void unsetErasureCodingPolicy(String src) throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override // ClientProtocol + public void setQuota(String path, long namespaceQuota, long storagespaceQuota, + StorageType type) throws IOException { + checkOperation(OperationCategory.WRITE); + + // TODO assign global replicas instead of applying them to each folder + final List locations = getLocationsForPath(path, true); + RemoteMethod method = new RemoteMethod("setQuota", + new Class[] {String.class, Long.class, Long.class, + StorageType.class}, + new RemoteParam(), namespaceQuota, storagespaceQuota, type); + rpcClient.invokeConcurrent(locations, method, false, false); + } + + @Override // ClientProtocol + public QuotaUsage getQuotaUsage(String path) throws IOException { + checkOperation(OperationCategory.READ, false); + return null; + } + + @Override + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + checkOperation(OperationCategory.WRITE); + + // Block pool id -> blocks + Map> blockLocations = new HashMap<>(); + for (LocatedBlock block : blocks) { + String bpId = block.getBlock().getBlockPoolId(); + List bpBlocks = blockLocations.get(bpId); + if (bpBlocks == null) { + bpBlocks = new LinkedList<>(); + blockLocations.put(bpId, bpBlocks); + } + bpBlocks.add(block); + } + + // Invoke each block pool + for (Entry> entry : blockLocations.entrySet()) { + String bpId = entry.getKey(); + List bpBlocks = entry.getValue(); + + LocatedBlock[] bpBlocksArray = + bpBlocks.toArray(new LocatedBlock[bpBlocks.size()]); + RemoteMethod method = new RemoteMethod("reportBadBlocks", + new Class[] {LocatedBlock[].class}, + new Object[] {bpBlocksArray}); + rpcClient.invokeSingleBlockPool(bpId, method); + } + } + + @Override + public void unsetStoragePolicy(String src) throws IOException { + checkOperation(OperationCategory.WRITE, false); + } + + @Override + public BlockStoragePolicy getStoragePolicy(String path) throws IOException { + checkOperation(OperationCategory.READ, false); + return null; + } + + /** + * Locate the location with the matching block pool id. + * + * @param path Path to check. + * @param failIfLocked Fail the request if locked (top mount point). + * @param blockPoolId The block pool ID of the namespace to search for. + * @return Prioritized list of locations in the federated cluster. + * @throws IOException if the location for this path cannot be determined. + */ + private RemoteLocation getLocationForPath( + String path, boolean failIfLocked, String blockPoolId) + throws IOException { + + final List locations = + getLocationsForPath(path, failIfLocked); + + String nameserviceId = null; + Set namespaces = + this.namenodeResolver.getNamespaces(); + for (FederationNamespaceInfo namespace : namespaces) { + if (namespace.getBlockPoolId().equals(blockPoolId)) { + nameserviceId = namespace.getNameserviceId(); + break; + } + } + if (nameserviceId != null) { + for (RemoteLocation location : locations) { + if (location.getNameserviceId().equals(nameserviceId)) { + return location; + } + } + } + throw new IOException( + "Cannot locate a nameservice for block pool " + blockPoolId); + } + + /** + * Get the possible locations of a path in the federated cluster. + * + * @param path Path to check. + * @param failIfLocked Fail the request if locked (top mount point). + * @return Prioritized list of locations in the federated cluster. + * @throws IOException If the location for this path cannot be determined. + */ + private List getLocationsForPath( + String path, boolean failIfLocked) throws IOException { + // Check the location for this path + final PathLocation location = + this.subclusterResolver.getDestinationForPath(path); + if (location == null) { + throw new IOException("Cannot find locations for " + path + " in " + + this.subclusterResolver); + } + + // Log the access to a path + return location.getDestinations(); + } + + /** + * Get the modification dates for mount points. + * + * @param path Name of the path to start checking dates from. + * @return Map with the modification dates for all sub-entries. + */ + private Map getMountPointDates(String path) { + Map ret = new TreeMap<>(); + // TODO add when we have a Mount Table + return ret; + } + + /** + * Create a new file status for a mount point. + * + * @param name Name of the mount point. + * @param childrenNum Number of children. + * @param dates Map with the dates. + * @return New HDFS file status representing a mount point. + */ + private HdfsFileStatus getMountPointStatus( + String name, int childrenNum, long date) { + long modTime = date; + long accessTime = date; + FsPermission permission = FsPermission.getDirDefault(); + String owner = this.superUser; + String group = this.superGroup; + try { + // TODO support users, it should be the user for the pointed folder + UserGroupInformation ugi = getRemoteUser(); + owner = ugi.getUserName(); + group = ugi.getPrimaryGroupName(); + } catch (IOException e) { + LOG.error("Cannot get the remote user: {}", e.getMessage()); + } + long inodeId = 0; + return new HdfsFileStatus(0, true, 0, 0, modTime, accessTime, permission, + owner, group, new byte[0], DFSUtil.string2Bytes(name), inodeId, + childrenNum, null, (byte) 0, null); + } + + /** + * Get the name of the method that is calling this function. + * + * @return Name of the method calling this function. + */ + private static String getMethodName() { + final StackTraceElement[] stack = Thread.currentThread().getStackTrace(); + String methodName = stack[3].getMethodName(); + return methodName; + } + + /** + * Get the user that is invoking this operation. + * + * @return Remote user group information. + * @throws IOException If we cannot get the user information. + */ + static UserGroupInformation getRemoteUser() throws IOException { + UserGroupInformation ugi = Server.getRemoteUser(); + return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8854913c/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 02140a0..257751b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4556,6 +4556,101 @@ + dfs.federation.router.default.nameserviceId + + + Nameservice identifier of the default subcluster to monitor. + + + + + dfs.federation.router.rpc.enable + true + + If the RPC service to handle client requests in the router is enabled. + + + + + dfs.federation.router.rpc-address + 0.0.0.0:8888 + + RPC address that handles all clients requests. + The value of this property will take the form of router-host1:rpc-port. + + + + + dfs.federation.router.rpc-bind-host + + + The actual address the RPC server will bind to. If this optional address is + set, it overrides only the hostname portion of + dfs.federation.router.rpc-address. This is useful for making the name node + listen on all interfaces by setting it to 0.0.0.0. + + + + + dfs.federation.router.handler.count + 10 + + The number of server threads for the router to handle RPC requests from + clients. + + + + + dfs.federation.router.handler.queue.size + 100 + + The size of the queue for the number of handlers to handle RPC client requests. + + + + + dfs.federation.router.reader.count + 1 + + The number of readers for the router to handle RPC client requests. + + + + + dfs.federation.router.reader.queue.size + 100 + + The size of the queue for the number of readers for the router to handle RPC client requests. + + + + + dfs.federation.router.connection.pool-size + 1 + + Size of the pool of connections from the router to namenodes. + + + + + dfs.federation.router.connection.clean.ms + 10000 + + Time interval, in milliseconds, to check if the connection pool should + remove unused connections. + + + + + dfs.federation.router.connection.pool.clean.ms + 60000 + + Time interval, in milliseconds, to check if the connection manager should + remove unused connection pools. + + + + dfs.federation.router.file.resolver.client.class org.apache.hadoop.hdfs.server.federation.MockResolver http://git-wip-us.apache.org/repos/asf/hadoop/blob/8854913c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index 8674682..bbb548ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hdfs.server.federation; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.BufferedReader; import java.io.FileNotFoundException; @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; @@ -51,17 +52,11 @@ import org.apache.hadoop.security.AccessControlException; */ public final class FederationTestUtils { - public final static String NAMESERVICE1 = "ns0"; - public final static String NAMESERVICE2 = "ns1"; - public final static String NAMENODE1 = "nn0"; - public final static String NAMENODE2 = "nn1"; - public final static String NAMENODE3 = "nn2"; - public final static String NAMENODE4 = "nn3"; - public final static String ROUTER1 = "router0"; - public final static String ROUTER2 = "router1"; - public final static String ROUTER3 = "router2"; - public final static String ROUTER4 = "router3"; - public final static long BLOCK_SIZE_BYTES = 134217728; + public final static String[] NAMESERVICES = {"ns0", "ns1"}; + public final static String[] NAMENODES = {"nn0", "nn1", "nn2", "nn3"}; + public final static String[] ROUTERS = + {"router0", "router1", "router2", "router3"}; + private FederationTestUtils() { // Utility class @@ -81,7 +76,7 @@ public final class FederationTestUtils { triggeredException = e; } if (exceptionClass != null) { - assertNotNull("No exception was triggered, expected exception - " + assertNotNull("No exception was triggered, expected exception" + exceptionClass.getName(), triggeredException); assertEquals(exceptionClass, triggeredException.getClass()); } else { @@ -101,48 +96,43 @@ public final class FederationTestUtils { return report; } report.setHAServiceState(state); - report.setNamespaceInfo(new NamespaceInfo(1, "tesclusterid", ns, 0, - "testbuildvesion", "testsoftwareversion")); + NamespaceInfo nsInfo = new NamespaceInfo( + 1, "tesclusterid", ns, 0, "testbuildvesion", "testsoftwareversion"); + report.setNamespaceInfo(nsInfo); return report; } public static void waitNamenodeRegistered(ActiveNamenodeResolver resolver, - String nameserviceId, String namenodeId, - FederationNamenodeServiceState finalState) + String nsId, String nnId, FederationNamenodeServiceState finalState) throws InterruptedException, IllegalStateException, IOException { for (int loopCount = 0; loopCount < 20; loopCount++) { - if (loopCount > 0) { Thread.sleep(1000); } - List namenodes; - namenodes = - resolver.getNamenodesForNameserviceId(nameserviceId); + List namenodes = + resolver.getNamenodesForNameserviceId(nsId); for (FederationNamenodeContext namenode : namenodes) { - if (namenodeId != null - && !namenode.getNamenodeId().equals(namenodeId)) { - // Keep looking - continue; + // Check if this is the Namenode we are checking + if (namenode.getNamenodeId() == nnId || + namenode.getNamenodeId().equals(nnId)) { + if (finalState != null && !namenode.getState().equals(finalState)) { + // Wrong state, wait a bit more + break; + } else { + // Found and verified + return; + } } - if (finalState != null && !namenode.getState().equals(finalState)) { - // Wrong state, wait a bit more - break; - } - // Found - return; } } - assertTrue("Failed to verify state store registration for state - " - + finalState + " - " + " - " + nameserviceId + " - ", false); + fail("Failed to verify State Store registration of " + nsId + " " + nnId + + " for state " + finalState); } public static boolean verifyDate(Date d1, Date d2, long precision) { - if (Math.abs(d1.getTime() - d2.getTime()) < precision) { - return true; - } - return false; + return Math.abs(d1.getTime() - d2.getTime()) < precision; } public static boolean addDirectory(FileSystem context, String path) @@ -165,15 +155,14 @@ public final class FederationTestUtils { } catch (Exception e) { return false; } - return false; } - public static boolean checkForFileInDirectory(FileSystem context, - String testPath, String targetFile) throws AccessControlException, - FileNotFoundException, - UnsupportedFileSystemException, IllegalArgumentException, - IOException { + public static boolean checkForFileInDirectory( + FileSystem context, String testPath, String targetFile) + throws IOException, AccessControlException, FileNotFoundException, + UnsupportedFileSystemException, IllegalArgumentException { + FileStatus[] fileStatus = context.listStatus(new Path(testPath)); String file = null; String verifyPath = testPath + "/" + targetFile; @@ -194,7 +183,8 @@ public final class FederationTestUtils { public static int countContents(FileSystem context, String testPath) throws IOException { - FileStatus[] fileStatus = context.listStatus(new Path(testPath)); + Path path = new Path(testPath); + FileStatus[] fileStatus = context.listStatus(path); return fileStatus.length; } @@ -202,7 +192,7 @@ public final class FederationTestUtils { throws IOException { FsPermission permissions = new FsPermission("700"); FSDataOutputStream writeStream = fs.create(new Path(path), permissions, - true, 1000, (short) 1, BLOCK_SIZE_BYTES, null); + true, 1000, (short) 1, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null); for (int i = 0; i < length; i++) { writeStream.write(i); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org