Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1AC77200D19 for ; Fri, 1 Sep 2017 04:41:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1928516C60E; Fri, 1 Sep 2017 02:41:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 99EFC16C604 for ; Fri, 1 Sep 2017 04:41:05 +0200 (CEST) Received: (qmail 52556 invoked by uid 500); 1 Sep 2017 02:40:49 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 36480 invoked by uid 99); 1 Sep 2017 02:40:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Sep 2017 02:40:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 508A8F5601; Fri, 1 Sep 2017 02:40:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Fri, 01 Sep 2017 02:41:29 -0000 Message-Id: <98b5853be3cb40729f75e1e6ff090fd0@git.apache.org> In-Reply-To: <6190e5c5a16442f7b73137990f62c7a0@git.apache.org> References: <6190e5c5a16442f7b73137990f62c7a0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [62/73] [abbrv] hadoop git commit: HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri. archived-at: Fri, 01 Sep 2017 02:41:08 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a1a5fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index ee6f57d..2875750 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -43,7 +43,7 @@ import org.apache.hadoop.util.Time; /** * In-memory cache/mock of a namenode and file resolver. Stores the most - * recently updated NN information for each nameservice and block pool. Also + * recently updated NN information for each nameservice and block pool. It also * stores a virtual mount table for resolving global namespace paths to local NN * paths. */ @@ -51,82 +51,93 @@ public class MockResolver implements ActiveNamenodeResolver, FileSubclusterResolver { private Map> resolver = - new HashMap>(); - private Map> locations = - new HashMap>(); - private Set namespaces = - new HashSet(); + new HashMap<>(); + private Map> locations = new HashMap<>(); + private Set namespaces = new HashSet<>(); private String defaultNamespace = null; + public MockResolver(Configuration conf, StateStoreService store) { this.cleanRegistrations(); } - public void addLocation(String mount, String nameservice, String location) { - RemoteLocation remoteLocation = new RemoteLocation(nameservice, location); - List locationsList = locations.get(mount); + public void addLocation(String mount, String nsId, String location) { + List locationsList = this.locations.get(mount); if (locationsList == null) { - locationsList = new LinkedList(); - locations.put(mount, locationsList); + locationsList = new LinkedList<>(); + this.locations.put(mount, locationsList); } + + final RemoteLocation remoteLocation = new RemoteLocation(nsId, location); if (!locationsList.contains(remoteLocation)) { locationsList.add(remoteLocation); } if (this.defaultNamespace == null) { - this.defaultNamespace = nameservice; + this.defaultNamespace = nsId; } } public synchronized void cleanRegistrations() { - this.resolver = - new HashMap>(); - this.namespaces = new HashSet(); + this.resolver = new HashMap<>(); + this.namespaces = new HashSet<>(); } @Override public void updateActiveNamenode( - String ns, InetSocketAddress successfulAddress) { + String nsId, InetSocketAddress successfulAddress) { String address = successfulAddress.getHostName() + ":" + successfulAddress.getPort(); - String key = ns; + String key = nsId; if (key != null) { // Update the active entry @SuppressWarnings("unchecked") - List iterator = - (List) resolver.get(key); - for (FederationNamenodeContext namenode : iterator) { + List namenodes = + (List) this.resolver.get(key); + for (FederationNamenodeContext namenode : namenodes) { if (namenode.getRpcAddress().equals(address)) { MockNamenodeContext nn = (MockNamenodeContext) namenode; nn.setState(FederationNamenodeServiceState.ACTIVE); break; } } - Collections.sort(iterator, new NamenodePriorityComparator()); + // This operation modifies the list so we need to be careful + synchronized(namenodes) { + Collections.sort(namenodes, new NamenodePriorityComparator()); + } } } @Override public List getNamenodesForNameserviceId(String nameserviceId) { - return resolver.get(nameserviceId); + // Return a copy of the list because it is updated periodically + List namenodes = + this.resolver.get(nameserviceId); + return Collections.unmodifiableList(new ArrayList<>(namenodes)); } @Override public List getNamenodesForBlockPoolId( String blockPoolId) { - return resolver.get(blockPoolId); + // Return a copy of the list because it is updated periodically + List namenodes = + this.resolver.get(blockPoolId); + return Collections.unmodifiableList(new ArrayList<>(namenodes)); } private static class MockNamenodeContext implements FederationNamenodeContext { + + private String namenodeId; + private String nameserviceId; + private String webAddress; private String rpcAddress; private String serviceAddress; private String lifelineAddress; - private String namenodeId; - private String nameserviceId; + private FederationNamenodeServiceState state; private long dateModified; @@ -197,6 +208,7 @@ public class MockResolver @Override public synchronized boolean registerNamenode(NamenodeStatusReport report) throws IOException { + MockNamenodeContext context = new MockNamenodeContext( report.getRpcAddress(), report.getServiceAddress(), report.getLifelineAddress(), report.getWebAddress(), @@ -205,13 +217,14 @@ public class MockResolver String nsId = report.getNameserviceId(); String bpId = report.getBlockPoolId(); String cId = report.getClusterId(); + @SuppressWarnings("unchecked") List existingItems = - (List) resolver.get(nsId); + (List) this.resolver.get(nsId); if (existingItems == null) { - existingItems = new ArrayList(); - resolver.put(bpId, existingItems); - resolver.put(nsId, existingItems); + existingItems = new ArrayList<>(); + this.resolver.put(bpId, existingItems); + this.resolver.put(nsId, existingItems); } boolean added = false; for (int i=0; i namespaceSet = new HashSet(); - LinkedList remoteLocations = - new LinkedList(); - for(String key : this.locations.keySet()) { - if(path.startsWith(key)) { + Set namespaceSet = new HashSet<>(); + List remoteLocations = new LinkedList<>(); + for (String key : this.locations.keySet()) { + if (path.startsWith(key)) { for (RemoteLocation location : this.locations.get(key)) { - finalPath = location.getDest() + path.substring(key.length()); - nameservice = location.getNameserviceId(); + String finalPath = location.getDest() + path.substring(key.length()); + String nameservice = location.getNameserviceId(); RemoteLocation remoteLocation = new RemoteLocation(nameservice, finalPath); remoteLocations.add(remoteLocation); @@ -265,7 +275,7 @@ public class MockResolver @Override public List getMountPoints(String path) throws IOException { - List mounts = new ArrayList(); + List mounts = new ArrayList<>(); if (path.equals("/")) { // Mounts only supported under root level for (String mount : this.locations.keySet()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a1a5fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java index 16d624c..39fcf7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.federation; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; /** * Constructs a router configuration with individual features enabled/disabled. @@ -26,15 +27,32 @@ public class RouterConfigBuilder { private Configuration conf; + private boolean enableRpcServer = false; + public RouterConfigBuilder(Configuration configuration) { this.conf = configuration; } public RouterConfigBuilder() { - this.conf = new Configuration(); + this.conf = new Configuration(false); + } + + public RouterConfigBuilder all() { + this.enableRpcServer = true; + return this; + } + + public RouterConfigBuilder rpc(boolean enable) { + this.enableRpcServer = enable; + return this; + } + + public RouterConfigBuilder rpc() { + return this.rpc(true); } public Configuration build() { + conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer); return conf; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a1a5fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java index 55d04ad..4031b7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java @@ -17,27 +17,44 @@ */ package org.apache.hadoop.hdfs.server.federation; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.waitNamenodeRegistered; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.Map.Entry; import java.util.Random; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -46,16 +63,49 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Test utility to mimic a federated HDFS cluster with a router. + * Test utility to mimic a federated HDFS cluster with multiple routers. */ public class RouterDFSCluster { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterDFSCluster.class); + + public static final String TEST_STRING = "teststring"; + public static final String TEST_DIR = "testdir"; + public static final String TEST_FILE = "testfile"; + + + /** Nameservices in the federated cluster. */ + private List nameservices; + /** Namenodes in the federated cluster. */ + private List namenodes; + /** Routers in the federated cluster. */ + private List routers; + /** If the Namenodes are in high availability.*/ + private boolean highAvailability; + + /** Mini cluster. */ + private MiniDFSCluster cluster; + + /** Router configuration overrides. */ + private Configuration routerOverrides; + /** Namenode configuration overrides. */ + private Configuration namenodeOverrides; + + /** * Router context. */ @@ -69,13 +119,14 @@ public class RouterDFSCluster { private Configuration conf; private URI fileSystemUri; - public RouterContext(Configuration conf, String ns, String nn) + public RouterContext(Configuration conf, String nsId, String nnId) throws URISyntaxException { - this.namenodeId = nn; - this.nameserviceId = ns; this.conf = conf; - router = new Router(); - router.init(conf); + this.nameserviceId = nsId; + this.namenodeId = nnId; + + this.router = new Router(); + this.router.init(conf); } public Router getRouter() { @@ -99,18 +150,30 @@ public class RouterDFSCluster { } public void initRouter() throws URISyntaxException { + // Store the bound points for the router interfaces + InetSocketAddress rpcAddress = router.getRpcServerAddress(); + if (rpcAddress != null) { + this.rpcPort = rpcAddress.getPort(); + this.fileSystemUri = + URI.create("hdfs://" + NetUtils.getHostPortString(rpcAddress)); + // Override the default FS to point to the router RPC + DistributedFileSystem.setDefaultUri(conf, fileSystemUri); + try { + this.fileContext = FileContext.getFileContext(conf); + } catch (UnsupportedFileSystemException e) { + this.fileContext = null; + } + } } - public DistributedFileSystem getFileSystem() throws IOException { - DistributedFileSystem fs = - (DistributedFileSystem) DistributedFileSystem.get(conf); - return fs; + public FileSystem getFileSystem() throws IOException { + return DistributedFileSystem.get(conf); } public DFSClient getClient(UserGroupInformation user) throws IOException, URISyntaxException, InterruptedException { - LOG.info("Connecting to router at " + fileSystemUri); + LOG.info("Connecting to router at {}", fileSystemUri); return user.doAs(new PrivilegedExceptionAction() { @Override public DFSClient run() throws IOException { @@ -120,9 +183,8 @@ public class RouterDFSCluster { } public DFSClient getClient() throws IOException, URISyntaxException { - if (client == null) { - LOG.info("Connecting to router at " + fileSystemUri); + LOG.info("Connecting to router at {}", fileSystemUri); client = new DFSClient(fileSystemUri, conf); } return client; @@ -130,9 +192,10 @@ public class RouterDFSCluster { } /** - * Namenode context. + * Namenode context in the federated cluster. */ public class NamenodeContext { + private Configuration conf; private NameNode namenode; private String nameserviceId; private String namenodeId; @@ -143,14 +206,13 @@ public class RouterDFSCluster { private int httpPort; private URI fileSystemUri; private int index; - private Configuration conf; private DFSClient client; - public NamenodeContext(Configuration conf, String ns, String nn, - int index) { + public NamenodeContext( + Configuration conf, String nsId, String nnId, int index) { this.conf = conf; - this.namenodeId = nn; - this.nameserviceId = ns; + this.nameserviceId = nsId; + this.namenodeId = nnId; this.index = index; } @@ -170,20 +232,19 @@ public class RouterDFSCluster { return this.fileContext; } - public void setNamenode(NameNode n) throws URISyntaxException { - namenode = n; + public void setNamenode(NameNode nn) throws URISyntaxException { + this.namenode = nn; - // Store the bound ports and override the default FS with the local NN's - // RPC - rpcPort = n.getNameNodeAddress().getPort(); - servicePort = n.getServiceRpcAddress().getPort(); - lifelinePort = n.getServiceRpcAddress().getPort(); - httpPort = n.getHttpAddress().getPort(); - fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort()); - DistributedFileSystem.setDefaultUri(conf, fileSystemUri); + // Store the bound ports and override the default FS with the local NN RPC + this.rpcPort = nn.getNameNodeAddress().getPort(); + this.servicePort = nn.getServiceRpcAddress().getPort(); + this.lifelinePort = nn.getServiceRpcAddress().getPort(); + this.httpPort = nn.getHttpAddress().getPort(); + this.fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort()); + DistributedFileSystem.setDefaultUri(this.conf, this.fileSystemUri); try { - this.fileContext = FileContext.getFileContext(conf); + this.fileContext = FileContext.getFileContext(this.conf); } catch (UnsupportedFileSystemException e) { this.fileContext = null; } @@ -205,10 +266,8 @@ public class RouterDFSCluster { return namenode.getHttpAddress().getHostName() + ":" + httpPort; } - public DistributedFileSystem getFileSystem() throws IOException { - DistributedFileSystem fs = - (DistributedFileSystem) DistributedFileSystem.get(conf); - return fs; + public FileSystem getFileSystem() throws IOException { + return DistributedFileSystem.get(conf); } public void resetClient() { @@ -218,7 +277,7 @@ public class RouterDFSCluster { public DFSClient getClient(UserGroupInformation user) throws IOException, URISyntaxException, InterruptedException { - LOG.info("Connecting to namenode at " + fileSystemUri); + LOG.info("Connecting to namenode at {}", fileSystemUri); return user.doAs(new PrivilegedExceptionAction() { @Override public DFSClient run() throws IOException { @@ -229,7 +288,7 @@ public class RouterDFSCluster { public DFSClient getClient() throws IOException, URISyntaxException { if (client == null) { - LOG.info("Connecting to namenode at " + fileSystemUri); + LOG.info("Connecting to namenode at {}", fileSystemUri); client = new DFSClient(fileSystemUri, conf); } return client; @@ -244,36 +303,20 @@ public class RouterDFSCluster { } } - public static final String NAMENODE1 = "nn0"; - public static final String NAMENODE2 = "nn1"; - public static final String NAMENODE3 = "nn2"; - public static final String TEST_STRING = "teststring"; - public static final String TEST_DIR = "testdir"; - public static final String TEST_FILE = "testfile"; - - private List nameservices; - private List routers; - private List namenodes; - private static final Log LOG = LogFactory.getLog(RouterDFSCluster.class); - private MiniDFSCluster cluster; - private boolean highAvailability; - - protected static final int DEFAULT_HEARTBEAT_INTERVAL = 5; - protected static final int DEFAULT_CACHE_INTERVAL_SEC = 5; - private Configuration routerOverrides; - private Configuration namenodeOverrides; - - private static final String NAMENODES = NAMENODE1 + "," + NAMENODE2; - - public RouterDFSCluster(boolean ha, int numNameservices) { - this(ha, numNameservices, 2); - } - public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) { this.highAvailability = ha; configureNameservices(numNameservices, numNamenodes); } + public RouterDFSCluster(boolean ha, int numNameservices) { + this(ha, numNameservices, 2); + } + + /** + * Add configuration settings to override default Router settings. + * + * @param conf Router configuration overrides. + */ public void addRouterOverrides(Configuration conf) { if (this.routerOverrides == null) { this.routerOverrides = conf; @@ -282,6 +325,11 @@ public class RouterDFSCluster { } } + /** + * Add configuration settings to override default Namenode settings. + * + * @param conf Namenode configuration overrides. + */ public void addNamenodeOverrides(Configuration conf) { if (this.namenodeOverrides == null) { this.namenodeOverrides = conf; @@ -290,124 +338,134 @@ public class RouterDFSCluster { } } - public Configuration generateNamenodeConfiguration( - String defaultNameserviceId) { - Configuration c = new HdfsConfiguration(); + /** + * Generate the configuration for a client. + * + * @param nsId Nameservice identifier. + * @return New namenode configuration. + */ + public Configuration generateNamenodeConfiguration(String nsId) { + Configuration conf = new HdfsConfiguration(); - c.set(DFSConfigKeys.DFS_NAMESERVICES, getNameservicesKey()); - c.set("fs.defaultFS", "hdfs://" + defaultNameserviceId); + conf.set(DFS_NAMESERVICES, getNameservicesKey()); + conf.set(FS_DEFAULT_NAME_KEY, "hdfs://" + nsId); for (String ns : nameservices) { if (highAvailability) { - c.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, NAMENODES); + conf.set( + DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, + NAMENODES[0] + "," + NAMENODES[1]); } for (NamenodeContext context : getNamenodes(ns)) { String suffix = context.getConfSuffix(); - c.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix, + conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix, "127.0.0.1:" + context.rpcPort); - c.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix, + conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix, "127.0.0.1:" + context.httpPort); - c.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix, + conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix, "0.0.0.0"); } } - if (namenodeOverrides != null) { - c.addResource(namenodeOverrides); + if (this.namenodeOverrides != null) { + conf.addResource(this.namenodeOverrides); } - return c; + return conf; } + /** + * Generate the configuration for a client. + * + * @return New configuration for a client. + */ public Configuration generateClientConfiguration() { - Configuration conf = new HdfsConfiguration(); - conf.addResource(generateNamenodeConfiguration(getNameservices().get(0))); + Configuration conf = new HdfsConfiguration(false); + String ns0 = getNameservices().get(0); + conf.addResource(generateNamenodeConfiguration(ns0)); return conf; } - public Configuration generateRouterConfiguration(String localNameserviceId, - String localNamenodeId) throws IOException { - Configuration conf = new HdfsConfiguration(); - conf.addResource(generateNamenodeConfiguration(localNameserviceId)); + /** + * Generate the configuration for a Router. + * + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier. + * @return New configuration for a Router. + */ + public Configuration generateRouterConfiguration(String nsId, String nnId) { + + Configuration conf = new HdfsConfiguration(false); + conf.addResource(generateNamenodeConfiguration(nsId)); + + conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, 10); + conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); + + conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0)); // Use mock resolver classes - conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, - MockResolver.class.getCanonicalName()); - conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, - MockResolver.class.getCanonicalName()); + conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MockResolver.class, ActiveNamenodeResolver.class); + conf.setClass(FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MockResolver.class, FileSubclusterResolver.class); // Set the nameservice ID for the default NN monitor - conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, localNameserviceId); - - if (localNamenodeId != null) { - conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, localNamenodeId); + conf.set(DFS_NAMESERVICE_ID, nsId); + if (nnId != null) { + conf.set(DFS_HA_NAMENODE_ID_KEY, nnId); } - StringBuilder routerBuilder = new StringBuilder(); - for (String ns : nameservices) { - for (NamenodeContext context : getNamenodes(ns)) { - String suffix = context.getConfSuffix(); - - if (routerBuilder.length() != 0) { - routerBuilder.append(","); - } - routerBuilder.append(suffix); + // Add custom overrides if available + if (this.routerOverrides != null) { + for (Entry entry : this.routerOverrides) { + String confKey = entry.getKey(); + String confValue = entry.getValue(); + conf.set(confKey, confValue); } } - return conf; } public void configureNameservices(int numNameservices, int numNamenodes) { - nameservices = new ArrayList(); - for (int i = 0; i < numNameservices; i++) { - nameservices.add("ns" + i); - } - namenodes = new ArrayList(); - int index = 0; - for (String ns : nameservices) { - Configuration nnConf = generateNamenodeConfiguration(ns); - if (highAvailability) { - NamenodeContext context = - new NamenodeContext(nnConf, ns, NAMENODE1, index); - namenodes.add(context); - index++; - - if (numNamenodes > 1) { - context = new NamenodeContext(nnConf, ns, NAMENODE2, index + 1); - namenodes.add(context); - index++; - } + this.nameservices = new ArrayList<>(); + this.namenodes = new ArrayList<>(); - if (numNamenodes > 2) { - context = new NamenodeContext(nnConf, ns, NAMENODE3, index + 1); - namenodes.add(context); - index++; - } + NamenodeContext context = null; + int nnIndex = 0; + for (int i=0; i 0) { - ns.append(","); + StringBuilder sb = new StringBuilder(); + for (String nsId : this.nameservices) { + if (sb.length() > 0) { + sb.append(","); } - ns.append(nameservices.get(i)); + sb.append(nsId); } - return ns.toString(); + return sb.toString(); } public String getRandomNameservice() { Random r = new Random(); - return nameservices.get(r.nextInt(nameservices.size())); + int randIndex = r.nextInt(nameservices.size()); + return nameservices.get(randIndex); } public List getNameservices() { @@ -415,7 +473,7 @@ public class RouterDFSCluster { } public List getNamenodes(String nameservice) { - ArrayList nns = new ArrayList(); + List nns = new ArrayList<>(); for (NamenodeContext c : namenodes) { if (c.nameserviceId.equals(nameservice)) { nns.add(c); @@ -426,23 +484,23 @@ public class RouterDFSCluster { public NamenodeContext getRandomNamenode() { Random rand = new Random(); - return namenodes.get(rand.nextInt(namenodes.size())); + int i = rand.nextInt(this.namenodes.size()); + return this.namenodes.get(i); } public List getNamenodes() { - return namenodes; + return this.namenodes; } public boolean isHighAvailability() { return highAvailability; } - public NamenodeContext getNamenode(String nameservice, - String namenode) { - for (NamenodeContext c : namenodes) { + public NamenodeContext getNamenode(String nameservice, String namenode) { + for (NamenodeContext c : this.namenodes) { if (c.nameserviceId.equals(nameservice)) { - if (namenode == null || c.namenodeId == null || namenode.isEmpty() - || c.namenodeId.isEmpty()) { + if (namenode == null || namenode.isEmpty() || + c.namenodeId == null || c.namenodeId.isEmpty()) { return c; } else if (c.namenodeId.equals(namenode)) { return c; @@ -453,7 +511,7 @@ public class RouterDFSCluster { } public List getRouters(String nameservice) { - ArrayList nns = new ArrayList(); + List nns = new ArrayList<>(); for (RouterContext c : routers) { if (c.nameserviceId.equals(nameservice)) { nns.add(c); @@ -462,14 +520,13 @@ public class RouterDFSCluster { return nns; } - public RouterContext getRouterContext(String nameservice, - String namenode) { + public RouterContext getRouterContext(String nsId, String nnId) { for (RouterContext c : routers) { - if (namenode == null) { + if (nnId == null) { return c; } - if (c.namenodeId.equals(namenode) - && c.nameserviceId.equals(nameservice)) { + if (c.namenodeId.equals(nnId) && + c.nameserviceId.equals(nsId)) { return c; } } @@ -485,10 +542,10 @@ public class RouterDFSCluster { return routers; } - public RouterContext buildRouter(String nameservice, String namenode) + public RouterContext buildRouter(String nsId, String nnId) throws URISyntaxException, IOException { - Configuration config = generateRouterConfiguration(nameservice, namenode); - RouterContext rc = new RouterContext(config, nameservice, namenode); + Configuration config = generateRouterConfiguration(nsId, nnId); + RouterContext rc = new RouterContext(config, nsId, nnId); return rc; } @@ -500,10 +557,9 @@ public class RouterDFSCluster { try { MiniDFSNNTopology topology = new MiniDFSNNTopology(); for (String ns : nameservices) { - NSConf conf = new MiniDFSNNTopology.NSConf(ns); if (highAvailability) { - for(int i = 0; i < namenodes.size()/nameservices.size(); i++) { + for (int i=0; i(); - for (String ns : nameservices) { + // Create one router per nameservice + this.routers = new ArrayList<>(); + for (String ns : this.nameservices) { for (NamenodeContext context : getNamenodes(ns)) { - routers.add(buildRouter(ns, context.namenodeId)); + RouterContext router = buildRouter(ns, context.namenodeId); + this.routers.add(router); } } // Start all routers - for (RouterContext router : routers) { + for (RouterContext router : this.routers) { router.router.start(); } + // Wait until all routers are active and record their ports - for (RouterContext router : routers) { + for (RouterContext router : this.routers) { waitActive(router); router.initRouter(); } @@ -570,22 +634,21 @@ public class RouterDFSCluster { } Thread.sleep(1000); } - assertFalse( - "Timeout waiting for " + router.router.toString() + " to activate.", - true); + fail("Timeout waiting for " + router.router + " to activate"); } - public void registerNamenodes() throws IOException { - for (RouterContext r : routers) { + for (RouterContext r : this.routers) { ActiveNamenodeResolver resolver = r.router.getNamenodeResolver(); - for (NamenodeContext nn : namenodes) { + for (NamenodeContext nn : this.namenodes) { // Generate a report - NamenodeStatusReport report = new NamenodeStatusReport(nn.nameserviceId, - nn.namenodeId, nn.getRpcAddress(), nn.getServiceAddress(), + NamenodeStatusReport report = new NamenodeStatusReport( + nn.nameserviceId, nn.namenodeId, + nn.getRpcAddress(), nn.getServiceAddress(), nn.getLifelineAddress(), nn.getHttpAddress()); - report.setNamespaceInfo(nn.namenode.getNamesystem().getFSImage() - .getStorage().getNamespaceInfo()); + FSImage fsImage = nn.namenode.getNamesystem().getFSImage(); + NamespaceInfo nsInfo = fsImage.getStorage().getNamespaceInfo(); + report.setNamespaceInfo(nsInfo); // Determine HA state from nn public state string String nnState = nn.namenode.getState(); @@ -606,74 +669,97 @@ public class RouterDFSCluster { public void waitNamenodeRegistration() throws InterruptedException, IllegalStateException, IOException { - for (RouterContext r : routers) { - for (NamenodeContext nn : namenodes) { - FederationTestUtils.waitNamenodeRegistered( - r.router.getNamenodeResolver(), nn.nameserviceId, nn.namenodeId, - null); + for (RouterContext r : this.routers) { + Router router = r.router; + for (NamenodeContext nn : this.namenodes) { + ActiveNamenodeResolver nnResolver = router.getNamenodeResolver(); + waitNamenodeRegistered( + nnResolver, nn.nameserviceId, nn.namenodeId, null); } } } public void waitRouterRegistrationQuorum(RouterContext router, - FederationNamenodeServiceState state, String nameservice, String namenode) + FederationNamenodeServiceState state, String nsId, String nnId) throws InterruptedException, IOException { - LOG.info("Waiting for NN - " + nameservice + ":" + namenode - + " to transition to state - " + state); - FederationTestUtils.waitNamenodeRegistered( - router.router.getNamenodeResolver(), nameservice, namenode, state); + LOG.info("Waiting for NN {} {} to transition to {}", nsId, nnId, state); + ActiveNamenodeResolver nnResolver = router.router.getNamenodeResolver(); + waitNamenodeRegistered(nnResolver, nsId, nnId, state); } - public String getFederatedPathForNameservice(String ns) { - return "/" + ns; + /** + * Get the federated path for a nameservice. + * @param nsId Nameservice identifier. + * @return Path in the Router. + */ + public String getFederatedPathForNS(String nsId) { + return "/" + nsId; } - public String getNamenodePathForNameservice(String ns) { - return "/target-" + ns; + /** + * Get the namenode path for a nameservice. + * @param nsId Nameservice identifier. + * @return Path in the Namenode. + */ + public String getNamenodePathForNS(String nsId) { + return "/target-" + nsId; } /** - * @return example: + * Get the federated test directory for a nameservice. + * @param nsId Nameservice identifier. + * @return Example: *
    *
  • /ns0/testdir which maps to ns0->/target-ns0/testdir *
*/ - public String getFederatedTestDirectoryForNameservice(String ns) { - return getFederatedPathForNameservice(ns) + "/" + TEST_DIR; + public String getFederatedTestDirectoryForNS(String nsId) { + return getFederatedPathForNS(nsId) + "/" + TEST_DIR; } /** + * Get the namenode test directory for a nameservice. + * @param nsId Nameservice identifier. * @return example: *
    *
  • /target-ns0/testdir *
*/ - public String getNamenodeTestDirectoryForNameservice(String ns) { - return getNamenodePathForNameservice(ns) + "/" + TEST_DIR; + public String getNamenodeTestDirectoryForNS(String nsId) { + return getNamenodePathForNS(nsId) + "/" + TEST_DIR; } /** + * Get the federated test file for a nameservice. + * @param nsId Nameservice identifier. * @return example: *
    *
  • /ns0/testfile which maps to ns0->/target-ns0/testfile *
*/ - public String getFederatedTestFileForNameservice(String ns) { - return getFederatedPathForNameservice(ns) + "/" + TEST_FILE; + public String getFederatedTestFileForNS(String nsId) { + return getFederatedPathForNS(nsId) + "/" + TEST_FILE; } /** + * Get the namenode test file for a nameservice. + * @param nsId Nameservice identifier. * @return example: *
    *
  • /target-ns0/testfile *
*/ - public String getNamenodeTestFileForNameservice(String ns) { - return getNamenodePathForNameservice(ns) + "/" + TEST_FILE; + public String getNamenodeTestFileForNS(String nsId) { + return getNamenodePathForNS(nsId) + "/" + TEST_FILE; } + /** + * Stop the federated HDFS cluster. + */ public void shutdown() { - cluster.shutdown(); + if (cluster != null) { + cluster.shutdown(); + } if (routers != null) { for (RouterContext context : routers) { stopRouter(context); @@ -681,9 +767,12 @@ public class RouterDFSCluster { } } + /** + * Stop a router. + * @param router Router context. + */ public void stopRouter(RouterContext router) { try { - router.router.shutDown(); int loopCount = 0; @@ -691,7 +780,7 @@ public class RouterDFSCluster { loopCount++; Thread.sleep(1000); if (loopCount > 20) { - LOG.error("Unable to shutdown router - " + router.rpcPort); + LOG.error("Cannot shutdown router {}", router.rpcPort); break; } } @@ -714,26 +803,28 @@ public class RouterDFSCluster { for (String ns : getNameservices()) { NamenodeContext context = getNamenode(ns, null); if (!createTestDirectoriesNamenode(context)) { - throw new IOException("Unable to create test directory for ns - " + ns); + throw new IOException("Cannot create test directory for ns " + ns); } } } public boolean createTestDirectoriesNamenode(NamenodeContext nn) throws IOException { - return FederationTestUtils.addDirectory(nn.getFileSystem(), - getNamenodeTestDirectoryForNameservice(nn.nameserviceId)); + FileSystem fs = nn.getFileSystem(); + String testDir = getNamenodeTestDirectoryForNS(nn.nameserviceId); + return addDirectory(fs, testDir); } public void deleteAllFiles() throws IOException { // Delete all files via the NNs and verify for (NamenodeContext context : getNamenodes()) { - FileStatus[] status = context.getFileSystem().listStatus(new Path("/")); - for(int i = 0; i nss = cluster.getNameservices(); + String ns0 = nss.get(0); + Configuration routerConfig = cluster.generateRouterConfiguration(ns0, null); + RouterRpcServer server = new RouterRpcServer(routerConfig, testRouter, + testRouter.getNamenodeResolver(), testRouter.getSubclusterResolver()); + server.init(routerConfig); + assertEquals(STATE.INITED, server.getServiceState()); + server.start(); + assertEquals(STATE.STARTED, server.getServiceState()); + server.stop(); + assertEquals(STATE.STOPPED, server.getServiceState()); + server.close(); + testRouter.close(); + } + + protected RouterDFSCluster getCluster() { + return TestRouterRpc.cluster; + } + + protected RouterContext getRouterContext() { + return this.router; + } + + protected void setRouter(RouterContext r) + throws IOException, URISyntaxException { + this.router = r; + this.routerProtocol = r.getClient().getNamenode(); + this.routerFS = r.getFileSystem(); + } + + protected FileSystem getRouterFileSystem() { + return this.routerFS; + } + + protected FileSystem getNamenodeFileSystem() { + return this.nnFS; + } + + protected ClientProtocol getRouterProtocol() { + return this.routerProtocol; + } + + protected ClientProtocol getNamenodeProtocol() { + return this.nnProtocol; + } + + protected NamenodeContext getNamenode() { + return this.namenode; + } + + protected void setNamenodeFile(String filename) { + this.nnFile = filename; + } + + protected String getNamenodeFile() { + return this.nnFile; + } + + protected void setRouterFile(String filename) { + this.routerFile = filename; + } + + protected String getRouterFile() { + return this.routerFile; + } + + protected void setNamenode(NamenodeContext nn) + throws IOException, URISyntaxException { + this.namenode = nn; + this.nnProtocol = nn.getClient().getNamenode(); + this.nnFS = nn.getFileSystem(); + } + + protected String getNs() { + return this.ns; + } + + protected void setNs(String nameservice) { + this.ns = nameservice; + } + + protected static void compareResponses( + ClientProtocol protocol1, ClientProtocol protocol2, + Method m, Object[] paramList) { + + Object return1 = null; + Exception exception1 = null; + try { + return1 = m.invoke(protocol1, paramList); + } catch (Exception ex) { + exception1 = ex; + } + + Object return2 = null; + Exception exception2 = null; + try { + return2 = m.invoke(protocol2, paramList); + } catch (Exception ex) { + exception2 = ex; + } + + assertEquals(return1, return2); + if (exception1 == null && exception2 == null) { + return; + } + + assertEquals( + exception1.getCause().getClass(), + exception2.getCause().getClass()); + } + + @Test + public void testProxyListFiles() throws IOException, InterruptedException, + URISyntaxException, NoSuchMethodException, SecurityException { + + // Verify that the root listing is a union of the mount table destinations + // and the files stored at all nameservices mounted at the root (ns0 + ns1) + // + // / --> + // /ns0 (from mount table) + // /ns1 (from mount table) + // all items in / of ns0 (default NS) + + // Collect the mount table entries from the root mount point + Set requiredPaths = new TreeSet<>(); + FileSubclusterResolver fileResolver = + router.getRouter().getSubclusterResolver(); + for (String mount : fileResolver.getMountPoints("/")) { + requiredPaths.add(mount); + } + + // Collect all files/dirs on the root path of the default NS + String defaultNs = cluster.getNameservices().get(0); + NamenodeContext nn = cluster.getNamenode(defaultNs, null); + FileStatus[] iterator = nn.getFileSystem().listStatus(new Path("/")); + for (FileStatus file : iterator) { + requiredPaths.add(file.getPath().getName()); + } + + // Fetch listing + DirectoryListing listing = + routerProtocol.getListing("/", HdfsFileStatus.EMPTY_NAME, false); + Iterator requiredPathsIterator = requiredPaths.iterator(); + // Match each path returned and verify order returned + for(HdfsFileStatus f : listing.getPartialListing()) { + String fileName = requiredPathsIterator.next(); + String currentFile = f.getFullPath(new Path("/")).getName(); + assertEquals(currentFile, fileName); + } + + // Verify the total number of results found/matched + assertEquals(requiredPaths.size(), listing.getPartialListing().length); + + // List a path that doesn't exist and validate error response with NN + // behavior. + Method m = ClientProtocol.class.getMethod( + "getListing", String.class, byte[].class, boolean.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false}); + } + + @Test + public void testProxyListFilesWithConflict() + throws IOException, InterruptedException { + + // Add a directory to the namespace that conflicts with a mount point + NamenodeContext nn = cluster.getNamenode(ns, null); + FileSystem nnFs = nn.getFileSystem(); + addDirectory(nnFs, cluster.getFederatedTestDirectoryForNS(ns)); + + FileSystem routerFs = router.getFileSystem(); + int initialCount = countContents(routerFs, "/"); + + // Root file system now for NS X: + // / -> + // /ns0 (mount table) + // /ns1 (mount table) + // /target-ns0 (the target folder for the NS0 mapped to / + // /nsX (local directory that duplicates mount table) + int newCount = countContents(routerFs, "/"); + assertEquals(initialCount, newCount); + + // Verify that each root path is readable and contains one test directory + assertEquals(1, countContents(routerFs, cluster.getFederatedPathForNS(ns))); + + // Verify that real folder for the ns contains a single test directory + assertEquals(1, countContents(nnFs, cluster.getNamenodePathForNS(ns))); + + } + + protected void testRename(RouterContext testRouter, String filename, + String renamedFile, boolean exceptionExpected) throws IOException { + + createFile(testRouter.getFileSystem(), filename, 32); + // verify + verifyFileExists(testRouter.getFileSystem(), filename); + // rename + boolean exceptionThrown = false; + try { + DFSClient client = testRouter.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename(filename, renamedFile); + } catch (Exception ex) { + exceptionThrown = true; + } + if (exceptionExpected) { + // Error was expected + assertTrue(exceptionThrown); + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(filename), true)); + } else { + // No error was expected + assertFalse(exceptionThrown); + // verify + assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile)); + // delete + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(renamedFile), true)); + } + } + + protected void testRename2(RouterContext testRouter, String filename, + String renamedFile, boolean exceptionExpected) throws IOException { + createFile(testRouter.getFileSystem(), filename, 32); + // verify + verifyFileExists(testRouter.getFileSystem(), filename); + // rename + boolean exceptionThrown = false; + try { + DFSClient client = testRouter.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename2(filename, renamedFile, new Options.Rename[] {}); + } catch (Exception ex) { + exceptionThrown = true; + } + assertEquals(exceptionExpected, exceptionThrown); + if (exceptionExpected) { + // Error was expected + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(filename), true)); + } else { + // verify + assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile)); + // delete + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(renamedFile), true)); + } + } + + @Test + public void testProxyRenameFiles() throws IOException, InterruptedException { + + Thread.sleep(5000); + List nss = cluster.getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + + // Rename within the same namespace + // /ns0/testdir/testrename -> /ns0/testdir/testrename-append + String filename = + cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename"; + String renamedFile = filename + "-append"; + testRename(router, filename, renamedFile, false); + testRename2(router, filename, renamedFile, false); + + // Rename a file to a destination that is in a different namespace (fails) + filename = cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename"; + renamedFile = cluster.getFederatedTestDirectoryForNS(ns1) + "/testrename"; + testRename(router, filename, renamedFile, true); + testRename2(router, filename, renamedFile, true); + } + + @Test + public void testProxyChownFiles() throws Exception { + + String newUsername = "TestUser"; + String newGroup = "TestGroup"; + + // change owner + routerProtocol.setOwner(routerFile, newUsername, newGroup); + + // Verify with NN + FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile); + assertEquals(file.getOwner(), newUsername); + assertEquals(file.getGroup(), newGroup); + + // Bad request and validate router response matches NN response. + Method m = ClientProtocol.class.getMethod("setOwner", String.class, + String.class, String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, newUsername, newGroup}); + } + + @Test + public void testProxyGetStats() throws Exception { + + long[] combinedData = routerProtocol.getStats(); + + long[] individualData = new long[10]; + for (String nameservice : cluster.getNameservices()) { + NamenodeContext n = cluster.getNamenode(nameservice, null); + DFSClient client = n.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + long[] data = clientProtocol.getStats(); + for (int i = 0; i < data.length; i++) { + individualData[i] += data[i]; + } + assert(data.length == combinedData.length); + } + + for (int i = 0; i < combinedData.length && i < individualData.length; i++) { + if (i == ClientProtocol.GET_STATS_REMAINING_IDX) { + // Skip available storage as this fluctuates in mini cluster + continue; + } + assertEquals(combinedData[i], individualData[i]); + } + } + + @Test + public void testProxyGetDatanodeReport() throws Exception { + + DatanodeInfo[] combinedData = + routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + + Set individualData = new HashSet(); + for (String nameservice : cluster.getNameservices()) { + NamenodeContext n = cluster.getNamenode(nameservice, null); + DFSClient client = n.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + DatanodeInfo[] data = + clientProtocol.getDatanodeReport(DatanodeReportType.ALL); + for (int i = 0; i < data.length; i++) { + // Collect unique DNs based on their xfer port + DatanodeInfo info = data[i]; + individualData.add(info.getXferPort()); + } + } + assertEquals(combinedData.length, individualData.size()); + } + + @Test + public void testProxyGetDatanodeStorageReport() + throws IOException, InterruptedException, URISyntaxException { + + DatanodeStorageReport[] combinedData = + routerProtocol.getDatanodeStorageReport(DatanodeReportType.ALL); + + Set individualData = new HashSet<>(); + for (String nameservice : cluster.getNameservices()) { + NamenodeContext n = cluster.getNamenode(nameservice, null); + DFSClient client = n.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + DatanodeStorageReport[] data = + clientProtocol.getDatanodeStorageReport(DatanodeReportType.ALL); + for (DatanodeStorageReport report : data) { + // Determine unique DN instances + DatanodeInfo dn = report.getDatanodeInfo(); + individualData.add(dn.toString()); + } + } + assertEquals(combinedData.length, individualData.size()); + } + + @Test + public void testProxyMkdir() throws Exception { + + // Check the initial folders + FileStatus[] filesInitial = routerFS.listStatus(new Path("/")); + + // Create a directory via the router at the root level + String dirPath = "/testdir"; + FsPermission permission = new FsPermission("705"); + routerProtocol.mkdirs(dirPath, permission, false); + + // Verify the root listing has the item via the router + FileStatus[] files = routerFS.listStatus(new Path("/")); + assertEquals(Arrays.toString(files) + " should be " + + Arrays.toString(filesInitial) + " + " + dirPath, + filesInitial.length + 1, files.length); + assertTrue(verifyFileExists(routerFS, dirPath)); + + // Verify the directory is present in only 1 Namenode + int foundCount = 0; + for (NamenodeContext n : cluster.getNamenodes()) { + if (verifyFileExists(n.getFileSystem(), dirPath)) { + foundCount++; + } + } + assertEquals(1, foundCount); + assertTrue(deleteFile(routerFS, dirPath)); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod("mkdirs", String.class, + FsPermission.class, boolean.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, permission, false}); + } + + @Test + public void testProxyChmodFiles() throws Exception { + + FsPermission permission = new FsPermission("444"); + + // change permissions + routerProtocol.setPermission(routerFile, permission); + + // Validate permissions NN + FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile); + assertEquals(permission, file.getPermission()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "setPermission", String.class, FsPermission.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, permission}); + } + + @Test + public void testProxySetReplication() throws Exception { + + // Check current replication via NN + FileStatus file = getFileStatus(nnFS, nnFile); + assertEquals(1, file.getReplication()); + + // increment replication via router + routerProtocol.setReplication(routerFile, (short) 2); + + // Verify via NN + file = getFileStatus(nnFS, nnFile); + assertEquals(2, file.getReplication()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "setReplication", String.class, short.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, (short) 2}); + } + + @Test + public void testProxyTruncateFile() throws Exception { + + // Check file size via NN + FileStatus file = getFileStatus(nnFS, nnFile); + assertTrue(file.getLen() > 0); + + // Truncate to 0 bytes via router + routerProtocol.truncate(routerFile, 0, "testclient"); + + // Verify via NN + file = getFileStatus(nnFS, nnFile); + assertEquals(0, file.getLen()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "truncate", String.class, long.class, String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, (long) 0, "testclient"}); + } + + @Test + public void testProxyGetBlockLocations() throws Exception { + + // Fetch block locations via router + LocatedBlocks locations = + routerProtocol.getBlockLocations(routerFile, 0, 1024); + assertEquals(1, locations.getLocatedBlocks().size()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "getBlockLocations", String.class, long.class, long.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, + m, new Object[] {badPath, (long) 0, (long) 0}); + } + + @Test + public void testProxyStoragePolicy() throws Exception { + + // Query initial policy via NN + HdfsFileStatus status = namenode.getClient().getFileInfo(nnFile); + + // Set a random policy via router + BlockStoragePolicy[] policies = namenode.getClient().getStoragePolicies(); + BlockStoragePolicy policy = policies[0]; + + while (policy.isCopyOnCreateFile()) { + // Pick a non copy on create policy + Random rand = new Random(); + int randIndex = rand.nextInt(policies.length); + policy = policies[randIndex]; + } + routerProtocol.setStoragePolicy(routerFile, policy.getName()); + + // Verify policy via NN + HdfsFileStatus newStatus = namenode.getClient().getFileInfo(nnFile); + assertTrue(newStatus.getStoragePolicy() == policy.getId()); + assertTrue(newStatus.getStoragePolicy() != status.getStoragePolicy()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod("setStoragePolicy", String.class, + String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, + m, new Object[] {badPath, "badpolicy"}); + } + + @Test + public void testProxyGetPreferedBlockSize() throws Exception { + + // Query via NN and Router and verify + long namenodeSize = nnProtocol.getPreferredBlockSize(nnFile); + long routerSize = routerProtocol.getPreferredBlockSize(routerFile); + assertEquals(routerSize, namenodeSize); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "getPreferredBlockSize", String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses( + routerProtocol, nnProtocol, m, new Object[] {badPath}); + } + + private void testConcat( + String source, String target, boolean failureExpected) { + boolean failure = false; + try { + // Concat test file with fill block length file via router + routerProtocol.concat(target, new String[] {source}); + } catch (IOException ex) { + failure = true; + } + assertEquals(failureExpected, failure); + } + + @Test + public void testProxyConcatFile() throws Exception { + + // Create a stub file in the primary ns + String sameNameservice = ns; + String existingFile = + cluster.getFederatedTestDirectoryForNS(sameNameservice) + + "_concatfile"; + int existingFileSize = 32; + createFile(routerFS, existingFile, existingFileSize); + + // Identify an alternate nameservice that doesn't match the existing file + String alternateNameservice = null; + for (String n : cluster.getNameservices()) { + if (!n.equals(sameNameservice)) { + alternateNameservice = n; + break; + } + } + + // Create new files, must be a full block to use concat. One file is in the + // same namespace as the target file, the other is in a different namespace. + String altRouterFile = + cluster.getFederatedTestDirectoryForNS(alternateNameservice) + + "_newfile"; + String sameRouterFile = + cluster.getFederatedTestDirectoryForNS(sameNameservice) + + "_newfile"; + createFile(routerFS, altRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); + createFile(routerFS, sameRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); + + // Concat in different namespaces, fails + testConcat(existingFile, altRouterFile, true); + + // Concat in same namespaces, succeeds + testConcat(existingFile, sameRouterFile, false); + + // Check target file length + FileStatus status = getFileStatus(routerFS, sameRouterFile); + assertEquals( + existingFileSize + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, + status.getLen()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "concat", String.class, String[].class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, new String[] {routerFile}}); + } + + @Test + public void testProxyAppend() throws Exception { + + // Append a test string via router + EnumSet createFlag = EnumSet.of(CreateFlag.APPEND); + DFSClient routerClient = getRouterContext().getClient(); + HdfsDataOutputStream stream = + routerClient.append(routerFile, 1024, createFlag, null, null); + stream.writeBytes(TEST_STRING); + stream.close(); + + // Verify file size via NN + FileStatus status = getFileStatus(nnFS, nnFile); + assertTrue(status.getLen() > TEST_STRING.length()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod("append", String.class, + String.class, EnumSetWritable.class); + String badPath = "/unknownlocation/unknowndir"; + EnumSetWritable createFlagWritable = + new EnumSetWritable(createFlag); + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, "testClient", createFlagWritable}); + } + + @Test + public void testProxyGetAdditionalDatanode() + throws IOException, InterruptedException, URISyntaxException { + + // Use primitive APIs to open a file, add a block, and get datanode location + EnumSet createFlag = EnumSet.of(CreateFlag.CREATE); + String clientName = getRouterContext().getClient().getClientName(); + String newRouterFile = routerFile + "_additionalDatanode"; + HdfsFileStatus status = routerProtocol.create( + newRouterFile, new FsPermission("777"), clientName, + new EnumSetWritable(createFlag), true, (short) 1, + (long) 1024, CryptoProtocolVersion.supported(), null); + + // Add a block via router (requires client to have same lease) + LocatedBlock block = routerProtocol.addBlock( + newRouterFile, clientName, null, null, + status.getFileId(), null, null); + + DatanodeInfo[] exclusions = new DatanodeInfo[0]; + LocatedBlock newBlock = routerProtocol.getAdditionalDatanode( + newRouterFile, status.getFileId(), block.getBlock(), + block.getLocations(), block.getStorageIDs(), exclusions, 1, clientName); + assertNotNull(newBlock); + } + + @Test + public void testProxyCreateFileAlternateUser() + throws IOException, URISyntaxException, InterruptedException { + + // Create via Router + String routerDir = cluster.getFederatedTestDirectoryForNS(ns); + String namenodeDir = cluster.getNamenodeTestDirectoryForNS(ns); + String newRouterFile = routerDir + "/unknownuser"; + String newNamenodeFile = namenodeDir + "/unknownuser"; + String username = "unknownuser"; + + // Allow all user access to dir + namenode.getFileContext().setPermission( + new Path(namenodeDir), new FsPermission("777")); + + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(username); + DFSClient client = getRouterContext().getClient(ugi); + client.create(newRouterFile, true); + + // Fetch via NN and check user + FileStatus status = getFileStatus(nnFS, newNamenodeFile); + assertEquals(status.getOwner(), username); + } + + @Test + public void testProxyGetFileInfoAcessException() throws IOException { + + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser("unknownuser"); + + // List files from the NN and trap the exception + Exception nnFailure = null; + try { + String testFile = cluster.getNamenodeTestFileForNS(ns); + namenode.getClient(ugi).getLocatedBlocks(testFile, 0); + } catch (Exception e) { + nnFailure = e; + } + assertNotNull(nnFailure); + + // List files from the router and trap the exception + Exception routerFailure = null; + try { + String testFile = cluster.getFederatedTestFileForNS(ns); + getRouterContext().getClient(ugi).getLocatedBlocks(testFile, 0); + } catch (Exception e) { + routerFailure = e; + } + assertNotNull(routerFailure); + + assertEquals(routerFailure.getClass(), nnFailure.getClass()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a1a5fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java new file mode 100644 index 0000000..5489691 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; + +/** + * The the RPC interface of the {@link getRouter()} implemented by + * {@link RouterRpcServer}. + */ +public class TestRouterRpcMultiDestination extends TestRouterRpc { + + @Override + public void testSetup() throws Exception { + + RouterDFSCluster cluster = getCluster(); + + // Create mock locations + getCluster().installMockLocations(); + List routers = cluster.getRouters(); + + // Add extra location to the root mount / such that the root mount points: + // / + // ns0 -> / + // ns1 -> / + for (RouterContext rc : routers) { + Router router = rc.getRouter(); + MockResolver resolver = (MockResolver) router.getSubclusterResolver(); + resolver.addLocation("/", cluster.getNameservices().get(1), "/"); + } + + // Create a mount that points to 2 dirs in the same ns: + // /same + // ns0 -> / + // ns0 -> /target-ns0 + for (RouterContext rc : routers) { + Router router = rc.getRouter(); + MockResolver resolver = (MockResolver) router.getSubclusterResolver(); + List nss = cluster.getNameservices(); + String ns0 = nss.get(0); + resolver.addLocation("/same", ns0, "/"); + resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0)); + } + + // Delete all files via the NNs and verify + cluster.deleteAllFiles(); + + // Create test fixtures on NN + cluster.createTestDirectoriesNamenode(); + + // Wait to ensure NN has fully created its test directories + Thread.sleep(100); + + // Pick a NS, namenode and getRouter() for this test + RouterContext router = cluster.getRandomRouter(); + this.setRouter(router); + + String ns = cluster.getRandomNameservice(); + this.setNs(ns); + this.setNamenode(cluster.getNamenode(ns, null)); + + // Create a test file on a single NN that is accessed via a getRouter() path + // with 2 destinations. All tests should failover to the alternate + // destination if the wrong NN is attempted first. + Random r = new Random(); + String randomString = "testfile-" + r.nextInt(); + setNamenodeFile("/" + randomString); + setRouterFile("/" + randomString); + + FileSystem nnFs = getNamenodeFileSystem(); + FileSystem routerFs = getRouterFileSystem(); + createFile(nnFs, getNamenodeFile(), 32); + + verifyFileExists(nnFs, getNamenodeFile()); + verifyFileExists(routerFs, getRouterFile()); + } + + private void testListing(String path) throws IOException { + + // Collect the mount table entries for this path + Set requiredPaths = new TreeSet<>(); + RouterContext rc = getRouterContext(); + Router router = rc.getRouter(); + FileSubclusterResolver subclusterResolver = router.getSubclusterResolver(); + for (String mount : subclusterResolver.getMountPoints(path)) { + requiredPaths.add(mount); + } + + // Get files/dirs from the Namenodes + PathLocation location = subclusterResolver.getDestinationForPath(path); + for (RemoteLocation loc : location.getDestinations()) { + String nsId = loc.getNameserviceId(); + String dest = loc.getDest(); + NamenodeContext nn = getCluster().getNamenode(nsId, null); + FileSystem fs = nn.getFileSystem(); + FileStatus[] files = fs.listStatus(new Path(dest)); + for (FileStatus file : files) { + String pathName = file.getPath().getName(); + requiredPaths.add(pathName); + } + } + + // Get files/dirs from the Router + DirectoryListing listing = + getRouterProtocol().getListing(path, HdfsFileStatus.EMPTY_NAME, false); + Iterator requiredPathsIterator = requiredPaths.iterator(); + + // Match each path returned and verify order returned + HdfsFileStatus[] partialListing = listing.getPartialListing(); + for (HdfsFileStatus fileStatus : listing.getPartialListing()) { + String fileName = requiredPathsIterator.next(); + String currentFile = fileStatus.getFullPath(new Path(path)).getName(); + assertEquals(currentFile, fileName); + } + + // Verify the total number of results found/matched + assertEquals( + requiredPaths + " doesn't match " + Arrays.toString(partialListing), + requiredPaths.size(), partialListing.length); + } + + @Override + public void testProxyListFiles() throws IOException, InterruptedException, + URISyntaxException, NoSuchMethodException, SecurityException { + + // Verify that the root listing is a union of the mount table destinations + // and the files stored at all nameservices mounted at the root (ns0 + ns1) + // / --> + // /ns0 (from mount table) + // /ns1 (from mount table) + // /same (from the mount table) + // all items in / of ns0 from mapping of / -> ns0:::/) + // all items in / of ns1 from mapping of / -> ns1:::/) + testListing("/"); + + // Verify that the "/same" mount point lists the contents of both dirs in + // the same ns + // /same --> + // /target-ns0 (from root of ns0) + // /testdir (from contents of /target-ns0) + testListing("/same"); + + // List a non-existing path and validate error response with NN behavior + ClientProtocol namenodeProtocol = + getCluster().getRandomNamenode().getClient().getNamenode(); + Method m = ClientProtocol.class.getMethod( + "getListing", String.class, byte[].class, boolean.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(getRouterProtocol(), namenodeProtocol, m, + new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false}); + } + + @Override + public void testProxyRenameFiles() throws IOException, InterruptedException { + + super.testProxyRenameFiles(); + + List nss = getCluster().getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + + // Rename a file from ns0 into the root (mapped to both ns0 and ns1) + String testDir0 = getCluster().getFederatedTestDirectoryForNS(ns0); + String filename0 = testDir0 + "/testrename"; + String renamedFile = "/testrename"; + testRename(getRouterContext(), filename0, renamedFile, false); + testRename2(getRouterContext(), filename0, renamedFile, false); + + // Rename a file from ns1 into the root (mapped to both ns0 and ns1) + String testDir1 = getCluster().getFederatedTestDirectoryForNS(ns1); + String filename1 = testDir1 + "/testrename"; + testRename(getRouterContext(), filename1, renamedFile, false); + testRename2(getRouterContext(), filename1, renamedFile, false); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org