Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A6EB9200D31 for ; Fri, 20 Oct 2017 20:23:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A5975160BEE; Fri, 20 Oct 2017 18:23:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 01DFA1609ED for ; Fri, 20 Oct 2017 20:23:38 +0200 (CEST) Received: (qmail 72175 invoked by uid 500); 20 Oct 2017 18:23:32 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 71176 invoked by uid 99); 20 Oct 2017 18:23:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Oct 2017 18:23:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6B87BE004D; Fri, 20 Oct 2017 18:23:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vrushali@apache.org To: common-commits@hadoop.apache.org Date: Fri, 20 Oct 2017 18:23:47 -0000 Message-Id: <3da4b3495f034f41a07aab27a9a52c08@git.apache.org> In-Reply-To: <2cfc589ade964cf294424afa61e054cd@git.apache.org> References: <2cfc589ade964cf294424afa61e054cd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/50] hadoop git commit: HDFS-10629. Federation Roter. Contributed by Jason Kace and Inigo Goiri. archived-at: Fri, 20 Oct 2017 18:23:40 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51de708/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java new file mode 100644 index 0000000..ee6f57d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodePriorityComparator; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.util.Time; + +/** + * In-memory cache/mock of a namenode and file resolver. Stores the most + * recently updated NN information for each nameservice and block pool. Also + * stores a virtual mount table for resolving global namespace paths to local NN + * paths. + */ +public class MockResolver + implements ActiveNamenodeResolver, FileSubclusterResolver { + + private Map> resolver = + new HashMap>(); + private Map> locations = + new HashMap>(); + private Set namespaces = + new HashSet(); + private String defaultNamespace = null; + + public MockResolver(Configuration conf, StateStoreService store) { + this.cleanRegistrations(); + } + + public void addLocation(String mount, String nameservice, String location) { + RemoteLocation remoteLocation = new RemoteLocation(nameservice, location); + List locationsList = locations.get(mount); + if (locationsList == null) { + locationsList = new LinkedList(); + locations.put(mount, locationsList); + } + if (!locationsList.contains(remoteLocation)) { + locationsList.add(remoteLocation); + } + + if (this.defaultNamespace == null) { + this.defaultNamespace = nameservice; + } + } + + public synchronized void cleanRegistrations() { + this.resolver = + new HashMap>(); + this.namespaces = new HashSet(); + } + + @Override + public void updateActiveNamenode( + String ns, InetSocketAddress successfulAddress) { + + String address = successfulAddress.getHostName() + ":" + + successfulAddress.getPort(); + String key = ns; + if (key != null) { + // Update the active entry + @SuppressWarnings("unchecked") + List iterator = + (List) resolver.get(key); + for (FederationNamenodeContext namenode : iterator) { + if (namenode.getRpcAddress().equals(address)) { + MockNamenodeContext nn = (MockNamenodeContext) namenode; + nn.setState(FederationNamenodeServiceState.ACTIVE); + break; + } + } + Collections.sort(iterator, new NamenodePriorityComparator()); + } + } + + @Override + public List + getNamenodesForNameserviceId(String nameserviceId) { + return resolver.get(nameserviceId); + } + + @Override + public List getNamenodesForBlockPoolId( + String blockPoolId) { + return resolver.get(blockPoolId); + } + + private static class MockNamenodeContext + implements FederationNamenodeContext { + private String webAddress; + private String rpcAddress; + private String serviceAddress; + private String lifelineAddress; + private String namenodeId; + private String nameserviceId; + private FederationNamenodeServiceState state; + private long dateModified; + + MockNamenodeContext( + String rpc, String service, String lifeline, String web, + String ns, String nn, FederationNamenodeServiceState state) { + this.rpcAddress = rpc; + this.serviceAddress = service; + this.lifelineAddress = lifeline; + this.webAddress = web; + this.namenodeId = nn; + this.nameserviceId = ns; + this.state = state; + this.dateModified = Time.now(); + } + + public void setState(FederationNamenodeServiceState newState) { + this.state = newState; + this.dateModified = Time.now(); + } + + @Override + public String getRpcAddress() { + return rpcAddress; + } + + @Override + public String getServiceAddress() { + return serviceAddress; + } + + @Override + public String getLifelineAddress() { + return lifelineAddress; + } + + @Override + public String getWebAddress() { + return webAddress; + } + + @Override + public String getNamenodeKey() { + return nameserviceId + " " + namenodeId + " " + rpcAddress; + } + + @Override + public String getNameserviceId() { + return nameserviceId; + } + + @Override + public String getNamenodeId() { + return namenodeId; + } + + @Override + public FederationNamenodeServiceState getState() { + return state; + } + + @Override + public long getDateModified() { + return dateModified; + } + } + + @Override + public synchronized boolean registerNamenode(NamenodeStatusReport report) + throws IOException { + MockNamenodeContext context = new MockNamenodeContext( + report.getRpcAddress(), report.getServiceAddress(), + report.getLifelineAddress(), report.getWebAddress(), + report.getNameserviceId(), report.getNamenodeId(), report.getState()); + + String nsId = report.getNameserviceId(); + String bpId = report.getBlockPoolId(); + String cId = report.getClusterId(); + @SuppressWarnings("unchecked") + List existingItems = + (List) resolver.get(nsId); + if (existingItems == null) { + existingItems = new ArrayList(); + resolver.put(bpId, existingItems); + resolver.put(nsId, existingItems); + } + boolean added = false; + for (int i=0; i getNamespaces() throws IOException { + return this.namespaces; + } + + @Override + public PathLocation getDestinationForPath(String path) throws IOException { + String finalPath = null; + String nameservice = null; + Set namespaceSet = new HashSet(); + LinkedList remoteLocations = + new LinkedList(); + for(String key : this.locations.keySet()) { + if(path.startsWith(key)) { + for (RemoteLocation location : this.locations.get(key)) { + finalPath = location.getDest() + path.substring(key.length()); + nameservice = location.getNameserviceId(); + RemoteLocation remoteLocation = + new RemoteLocation(nameservice, finalPath); + remoteLocations.add(remoteLocation); + namespaceSet.add(nameservice); + } + break; + } + } + if (remoteLocations.isEmpty()) { + // Path isn't supported, mimic resolver behavior. + return null; + } + return new PathLocation(path, remoteLocations, namespaceSet); + } + + @Override + public List getMountPoints(String path) throws IOException { + List mounts = new ArrayList(); + if (path.equals("/")) { + // Mounts only supported under root level + for (String mount : this.locations.keySet()) { + if (mount.length() > 1) { + // Remove leading slash, this is the behavior of the mount tree, + // return only names. + mounts.add(mount.replace("/", "")); + } + } + } + return mounts; + } + + @Override + public void setRouterId(String router) { + } + + @Override + public String getDefaultNamespace() { + return defaultNamespace; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51de708/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java new file mode 100644 index 0000000..16d624c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation; + +import org.apache.hadoop.conf.Configuration; + +/** + * Constructs a router configuration with individual features enabled/disabled. + */ +public class RouterConfigBuilder { + + private Configuration conf; + + public RouterConfigBuilder(Configuration configuration) { + this.conf = configuration; + } + + public RouterConfigBuilder() { + this.conf = new Configuration(); + } + + public Configuration build() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51de708/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java new file mode 100644 index 0000000..55d04ad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java @@ -0,0 +1,767 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; +import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service.STATE; + +/** + * Test utility to mimic a federated HDFS cluster with a router. + */ +public class RouterDFSCluster { + /** + * Router context. + */ + public class RouterContext { + private Router router; + private FileContext fileContext; + private String nameserviceId; + private String namenodeId; + private int rpcPort; + private DFSClient client; + private Configuration conf; + private URI fileSystemUri; + + public RouterContext(Configuration conf, String ns, String nn) + throws URISyntaxException { + this.namenodeId = nn; + this.nameserviceId = ns; + this.conf = conf; + router = new Router(); + router.init(conf); + } + + public Router getRouter() { + return this.router; + } + + public String getNameserviceId() { + return this.nameserviceId; + } + + public String getNamenodeId() { + return this.namenodeId; + } + + public int getRpcPort() { + return this.rpcPort; + } + + public FileContext getFileContext() { + return this.fileContext; + } + + public void initRouter() throws URISyntaxException { + } + + public DistributedFileSystem getFileSystem() throws IOException { + DistributedFileSystem fs = + (DistributedFileSystem) DistributedFileSystem.get(conf); + return fs; + } + + public DFSClient getClient(UserGroupInformation user) + throws IOException, URISyntaxException, InterruptedException { + + LOG.info("Connecting to router at " + fileSystemUri); + return user.doAs(new PrivilegedExceptionAction() { + @Override + public DFSClient run() throws IOException { + return new DFSClient(fileSystemUri, conf); + } + }); + } + + public DFSClient getClient() throws IOException, URISyntaxException { + + if (client == null) { + LOG.info("Connecting to router at " + fileSystemUri); + client = new DFSClient(fileSystemUri, conf); + } + return client; + } + } + + /** + * Namenode context. + */ + public class NamenodeContext { + private NameNode namenode; + private String nameserviceId; + private String namenodeId; + private FileContext fileContext; + private int rpcPort; + private int servicePort; + private int lifelinePort; + private int httpPort; + private URI fileSystemUri; + private int index; + private Configuration conf; + private DFSClient client; + + public NamenodeContext(Configuration conf, String ns, String nn, + int index) { + this.conf = conf; + this.namenodeId = nn; + this.nameserviceId = ns; + this.index = index; + } + + public NameNode getNamenode() { + return this.namenode; + } + + public String getNameserviceId() { + return this.nameserviceId; + } + + public String getNamenodeId() { + return this.namenodeId; + } + + public FileContext getFileContext() { + return this.fileContext; + } + + public void setNamenode(NameNode n) throws URISyntaxException { + namenode = n; + + // Store the bound ports and override the default FS with the local NN's + // RPC + rpcPort = n.getNameNodeAddress().getPort(); + servicePort = n.getServiceRpcAddress().getPort(); + lifelinePort = n.getServiceRpcAddress().getPort(); + httpPort = n.getHttpAddress().getPort(); + fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort()); + DistributedFileSystem.setDefaultUri(conf, fileSystemUri); + + try { + this.fileContext = FileContext.getFileContext(conf); + } catch (UnsupportedFileSystemException e) { + this.fileContext = null; + } + } + + public String getRpcAddress() { + return namenode.getNameNodeAddress().getHostName() + ":" + rpcPort; + } + + public String getServiceAddress() { + return namenode.getServiceRpcAddress().getHostName() + ":" + servicePort; + } + + public String getLifelineAddress() { + return namenode.getServiceRpcAddress().getHostName() + ":" + lifelinePort; + } + + public String getHttpAddress() { + return namenode.getHttpAddress().getHostName() + ":" + httpPort; + } + + public DistributedFileSystem getFileSystem() throws IOException { + DistributedFileSystem fs = + (DistributedFileSystem) DistributedFileSystem.get(conf); + return fs; + } + + public void resetClient() { + client = null; + } + + public DFSClient getClient(UserGroupInformation user) + throws IOException, URISyntaxException, InterruptedException { + + LOG.info("Connecting to namenode at " + fileSystemUri); + return user.doAs(new PrivilegedExceptionAction() { + @Override + public DFSClient run() throws IOException { + return new DFSClient(fileSystemUri, conf); + } + }); + } + + public DFSClient getClient() throws IOException, URISyntaxException { + if (client == null) { + LOG.info("Connecting to namenode at " + fileSystemUri); + client = new DFSClient(fileSystemUri, conf); + } + return client; + } + + public String getConfSuffix() { + String suffix = nameserviceId; + if (highAvailability) { + suffix += "." + namenodeId; + } + return suffix; + } + } + + public static final String NAMENODE1 = "nn0"; + public static final String NAMENODE2 = "nn1"; + public static final String NAMENODE3 = "nn2"; + public static final String TEST_STRING = "teststring"; + public static final String TEST_DIR = "testdir"; + public static final String TEST_FILE = "testfile"; + + private List nameservices; + private List routers; + private List namenodes; + private static final Log LOG = LogFactory.getLog(RouterDFSCluster.class); + private MiniDFSCluster cluster; + private boolean highAvailability; + + protected static final int DEFAULT_HEARTBEAT_INTERVAL = 5; + protected static final int DEFAULT_CACHE_INTERVAL_SEC = 5; + private Configuration routerOverrides; + private Configuration namenodeOverrides; + + private static final String NAMENODES = NAMENODE1 + "," + NAMENODE2; + + public RouterDFSCluster(boolean ha, int numNameservices) { + this(ha, numNameservices, 2); + } + + public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) { + this.highAvailability = ha; + configureNameservices(numNameservices, numNamenodes); + } + + public void addRouterOverrides(Configuration conf) { + if (this.routerOverrides == null) { + this.routerOverrides = conf; + } else { + this.routerOverrides.addResource(conf); + } + } + + public void addNamenodeOverrides(Configuration conf) { + if (this.namenodeOverrides == null) { + this.namenodeOverrides = conf; + } else { + this.namenodeOverrides.addResource(conf); + } + } + + public Configuration generateNamenodeConfiguration( + String defaultNameserviceId) { + Configuration c = new HdfsConfiguration(); + + c.set(DFSConfigKeys.DFS_NAMESERVICES, getNameservicesKey()); + c.set("fs.defaultFS", "hdfs://" + defaultNameserviceId); + + for (String ns : nameservices) { + if (highAvailability) { + c.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, NAMENODES); + } + + for (NamenodeContext context : getNamenodes(ns)) { + String suffix = context.getConfSuffix(); + + c.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix, + "127.0.0.1:" + context.rpcPort); + c.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix, + "127.0.0.1:" + context.httpPort); + c.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix, + "0.0.0.0"); + } + } + + if (namenodeOverrides != null) { + c.addResource(namenodeOverrides); + } + return c; + } + + public Configuration generateClientConfiguration() { + Configuration conf = new HdfsConfiguration(); + conf.addResource(generateNamenodeConfiguration(getNameservices().get(0))); + return conf; + } + + public Configuration generateRouterConfiguration(String localNameserviceId, + String localNamenodeId) throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.addResource(generateNamenodeConfiguration(localNameserviceId)); + + // Use mock resolver classes + conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MockResolver.class.getCanonicalName()); + conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MockResolver.class.getCanonicalName()); + + // Set the nameservice ID for the default NN monitor + conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, localNameserviceId); + + if (localNamenodeId != null) { + conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, localNamenodeId); + } + + StringBuilder routerBuilder = new StringBuilder(); + for (String ns : nameservices) { + for (NamenodeContext context : getNamenodes(ns)) { + String suffix = context.getConfSuffix(); + + if (routerBuilder.length() != 0) { + routerBuilder.append(","); + } + routerBuilder.append(suffix); + } + } + + return conf; + } + + public void configureNameservices(int numNameservices, int numNamenodes) { + nameservices = new ArrayList(); + for (int i = 0; i < numNameservices; i++) { + nameservices.add("ns" + i); + } + namenodes = new ArrayList(); + int index = 0; + for (String ns : nameservices) { + Configuration nnConf = generateNamenodeConfiguration(ns); + if (highAvailability) { + NamenodeContext context = + new NamenodeContext(nnConf, ns, NAMENODE1, index); + namenodes.add(context); + index++; + + if (numNamenodes > 1) { + context = new NamenodeContext(nnConf, ns, NAMENODE2, index + 1); + namenodes.add(context); + index++; + } + + if (numNamenodes > 2) { + context = new NamenodeContext(nnConf, ns, NAMENODE3, index + 1); + namenodes.add(context); + index++; + } + + } else { + NamenodeContext context = new NamenodeContext(nnConf, ns, null, index); + namenodes.add(context); + index++; + } + } + } + + public String getNameservicesKey() { + StringBuilder ns = new StringBuilder(); + for (int i = 0; i < nameservices.size(); i++) { + if (i > 0) { + ns.append(","); + } + ns.append(nameservices.get(i)); + } + return ns.toString(); + } + + public String getRandomNameservice() { + Random r = new Random(); + return nameservices.get(r.nextInt(nameservices.size())); + } + + public List getNameservices() { + return nameservices; + } + + public List getNamenodes(String nameservice) { + ArrayList nns = new ArrayList(); + for (NamenodeContext c : namenodes) { + if (c.nameserviceId.equals(nameservice)) { + nns.add(c); + } + } + return nns; + } + + public NamenodeContext getRandomNamenode() { + Random rand = new Random(); + return namenodes.get(rand.nextInt(namenodes.size())); + } + + public List getNamenodes() { + return namenodes; + } + + public boolean isHighAvailability() { + return highAvailability; + } + + public NamenodeContext getNamenode(String nameservice, + String namenode) { + for (NamenodeContext c : namenodes) { + if (c.nameserviceId.equals(nameservice)) { + if (namenode == null || c.namenodeId == null || namenode.isEmpty() + || c.namenodeId.isEmpty()) { + return c; + } else if (c.namenodeId.equals(namenode)) { + return c; + } + } + } + return null; + } + + public List getRouters(String nameservice) { + ArrayList nns = new ArrayList(); + for (RouterContext c : routers) { + if (c.nameserviceId.equals(nameservice)) { + nns.add(c); + } + } + return nns; + } + + public RouterContext getRouterContext(String nameservice, + String namenode) { + for (RouterContext c : routers) { + if (namenode == null) { + return c; + } + if (c.namenodeId.equals(namenode) + && c.nameserviceId.equals(nameservice)) { + return c; + } + } + return null; + } + + public RouterContext getRandomRouter() { + Random rand = new Random(); + return routers.get(rand.nextInt(routers.size())); + } + + public List getRouters() { + return routers; + } + + public RouterContext buildRouter(String nameservice, String namenode) + throws URISyntaxException, IOException { + Configuration config = generateRouterConfiguration(nameservice, namenode); + RouterContext rc = new RouterContext(config, nameservice, namenode); + return rc; + } + + public void startCluster() { + startCluster(null); + } + + public void startCluster(Configuration overrideConf) { + try { + MiniDFSNNTopology topology = new MiniDFSNNTopology(); + for (String ns : nameservices) { + + NSConf conf = new MiniDFSNNTopology.NSConf(ns); + if (highAvailability) { + for(int i = 0; i < namenodes.size()/nameservices.size(); i++) { + NNConf nnConf = new MiniDFSNNTopology.NNConf("nn" + i); + conf.addNN(nnConf); + } + } else { + NNConf nnConf = new MiniDFSNNTopology.NNConf(null); + conf.addNN(nnConf); + } + topology.addNameservice(conf); + } + topology.setFederation(true); + + // Start mini DFS cluster + Configuration nnConf = generateNamenodeConfiguration(nameservices.get(0)); + if (overrideConf != null) { + nnConf.addResource(overrideConf); + } + cluster = new MiniDFSCluster.Builder(nnConf).nnTopology(topology).build(); + cluster.waitActive(); + + // Store NN pointers + for (int i = 0; i < namenodes.size(); i++) { + NameNode nn = cluster.getNameNode(i); + namenodes.get(i).setNamenode(nn); + } + + } catch (Exception e) { + LOG.error("Cannot start Router DFS cluster: " + e.getMessage(), e); + cluster.shutdown(); + } + } + + public void startRouters() + throws InterruptedException, URISyntaxException, IOException { + // Create routers + routers = new ArrayList(); + for (String ns : nameservices) { + + for (NamenodeContext context : getNamenodes(ns)) { + routers.add(buildRouter(ns, context.namenodeId)); + } + } + + // Start all routers + for (RouterContext router : routers) { + router.router.start(); + } + // Wait until all routers are active and record their ports + for (RouterContext router : routers) { + waitActive(router); + router.initRouter(); + } + } + + public void waitActive(NamenodeContext nn) throws IOException { + cluster.waitActive(nn.index); + } + + public void waitActive(RouterContext router) + throws InterruptedException { + for (int loopCount = 0; loopCount < 20; loopCount++) { + // Validate connection of routers to NNs + if (router.router.getServiceState() == STATE.STARTED) { + return; + } + Thread.sleep(1000); + } + assertFalse( + "Timeout waiting for " + router.router.toString() + " to activate.", + true); + } + + + public void registerNamenodes() throws IOException { + for (RouterContext r : routers) { + ActiveNamenodeResolver resolver = r.router.getNamenodeResolver(); + for (NamenodeContext nn : namenodes) { + // Generate a report + NamenodeStatusReport report = new NamenodeStatusReport(nn.nameserviceId, + nn.namenodeId, nn.getRpcAddress(), nn.getServiceAddress(), + nn.getLifelineAddress(), nn.getHttpAddress()); + report.setNamespaceInfo(nn.namenode.getNamesystem().getFSImage() + .getStorage().getNamespaceInfo()); + + // Determine HA state from nn public state string + String nnState = nn.namenode.getState(); + HAServiceState haState = HAServiceState.ACTIVE; + for (HAServiceState state : HAServiceState.values()) { + if (nnState.equalsIgnoreCase(state.name())) { + haState = state; + break; + } + } + report.setHAServiceState(haState); + + // Register with the resolver + resolver.registerNamenode(report); + } + } + } + + public void waitNamenodeRegistration() + throws InterruptedException, IllegalStateException, IOException { + for (RouterContext r : routers) { + for (NamenodeContext nn : namenodes) { + FederationTestUtils.waitNamenodeRegistered( + r.router.getNamenodeResolver(), nn.nameserviceId, nn.namenodeId, + null); + } + } + } + + public void waitRouterRegistrationQuorum(RouterContext router, + FederationNamenodeServiceState state, String nameservice, String namenode) + throws InterruptedException, IOException { + LOG.info("Waiting for NN - " + nameservice + ":" + namenode + + " to transition to state - " + state); + FederationTestUtils.waitNamenodeRegistered( + router.router.getNamenodeResolver(), nameservice, namenode, state); + } + + public String getFederatedPathForNameservice(String ns) { + return "/" + ns; + } + + public String getNamenodePathForNameservice(String ns) { + return "/target-" + ns; + } + + /** + * @return example: + *
    + *
  • /ns0/testdir which maps to ns0->/target-ns0/testdir + *
+ */ + public String getFederatedTestDirectoryForNameservice(String ns) { + return getFederatedPathForNameservice(ns) + "/" + TEST_DIR; + } + + /** + * @return example: + *
    + *
  • /target-ns0/testdir + *
+ */ + public String getNamenodeTestDirectoryForNameservice(String ns) { + return getNamenodePathForNameservice(ns) + "/" + TEST_DIR; + } + + /** + * @return example: + *
    + *
  • /ns0/testfile which maps to ns0->/target-ns0/testfile + *
+ */ + public String getFederatedTestFileForNameservice(String ns) { + return getFederatedPathForNameservice(ns) + "/" + TEST_FILE; + } + + /** + * @return example: + *
    + *
  • /target-ns0/testfile + *
+ */ + public String getNamenodeTestFileForNameservice(String ns) { + return getNamenodePathForNameservice(ns) + "/" + TEST_FILE; + } + + public void shutdown() { + cluster.shutdown(); + if (routers != null) { + for (RouterContext context : routers) { + stopRouter(context); + } + } + } + + public void stopRouter(RouterContext router) { + try { + + router.router.shutDown(); + + int loopCount = 0; + while (router.router.getServiceState() != STATE.STOPPED) { + loopCount++; + Thread.sleep(1000); + if (loopCount > 20) { + LOG.error("Unable to shutdown router - " + router.rpcPort); + break; + } + } + } catch (InterruptedException e) { + } + } + + ///////////////////////////////////////////////////////////////////////////// + // Namespace Test Fixtures + ///////////////////////////////////////////////////////////////////////////// + + /** + * Creates test directories via the namenode. + * 1) /target-ns0/testfile + * 2) /target-ns1/testfile + * @throws IOException + */ + public void createTestDirectoriesNamenode() throws IOException { + // Add a test dir to each NS and verify + for (String ns : getNameservices()) { + NamenodeContext context = getNamenode(ns, null); + if (!createTestDirectoriesNamenode(context)) { + throw new IOException("Unable to create test directory for ns - " + ns); + } + } + } + + public boolean createTestDirectoriesNamenode(NamenodeContext nn) + throws IOException { + return FederationTestUtils.addDirectory(nn.getFileSystem(), + getNamenodeTestDirectoryForNameservice(nn.nameserviceId)); + } + + public void deleteAllFiles() throws IOException { + // Delete all files via the NNs and verify + for (NamenodeContext context : getNamenodes()) { + FileStatus[] status = context.getFileSystem().listStatus(new Path("/")); + for(int i = 0; i + *
  • / -> [ns0->/]. + *
  • /nso -> ns0->/target-ns0. + *
  • /ns1 -> ns1->/target-ns1. + * + */ + public void installMockLocations() { + for (RouterContext r : routers) { + MockResolver resolver = + (MockResolver) r.router.getSubclusterResolver(); + // create table entries + for (String ns : nameservices) { + // Direct path + resolver.addLocation(getFederatedPathForNameservice(ns), ns, + getNamenodePathForNameservice(ns)); + } + + // Root path goes to both NS1 + resolver.addLocation("/", nameservices.get(0), "/"); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51de708/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java new file mode 100644 index 0000000..8c720c7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.URISyntaxException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.service.Service.STATE; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * The the safe mode for the {@link Router} controlled by + * {@link SafeModeTimer}. + */ +public class TestRouter { + + private static Configuration conf; + + @BeforeClass + public static void create() throws IOException { + // Basic configuration without the state store + conf = new Configuration(); + // Mock resolver classes + conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MockResolver.class.getCanonicalName()); + conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MockResolver.class.getCanonicalName()); + + // Simulate a co-located NN + conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0"); + conf.set("fs.defaultFS", "hdfs://" + "ns0"); + conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + "ns0", + "127.0.0.1:0" + 0); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + "ns0", + "127.0.0.1:" + 0); + conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + "ns0", + "0.0.0.0"); + } + + @AfterClass + public static void destroy() { + } + + @Before + public void setup() throws IOException, URISyntaxException { + } + + @After + public void cleanup() { + } + + private static void testRouterStartup(Configuration routerConfig) + throws InterruptedException, IOException { + Router router = new Router(); + assertEquals(STATE.NOTINITED, router.getServiceState()); + router.init(routerConfig); + assertEquals(STATE.INITED, router.getServiceState()); + router.start(); + assertEquals(STATE.STARTED, router.getServiceState()); + router.stop(); + assertEquals(STATE.STOPPED, router.getServiceState()); + router.close(); + } + + @Test + public void testRouterService() throws InterruptedException, IOException { + + // Run with all services + testRouterStartup((new RouterConfigBuilder(conf)).build()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org