Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9F5BC185BC for ; Wed, 24 Jun 2015 17:50:48 +0000 (UTC) Received: (qmail 84031 invoked by uid 500); 24 Jun 2015 17:50:40 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 83084 invoked by uid 500); 24 Jun 2015 17:50:40 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 82019 invoked by uid 99); 24 Jun 2015 17:50:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jun 2015 17:50:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AE5A4E360F; Wed, 24 Jun 2015 17:50:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: arp@apache.org To: common-commits@hadoop.apache.org Date: Wed, 24 Jun 2015 17:50:52 -0000 Message-Id: <1becfbac94a1424b9db1a70197a2f46f@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/21] hadoop git commit: HDFS-6440. Support more than 2 NameNodes. Contributed by Jesse Yates. http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index fdbacdc..0a21886 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -62,6 +62,8 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -445,7 +447,7 @@ public class MiniDFSCluster { final int numNameNodes = builder.nnTopology.countNameNodes(); LOG.info("starting cluster: numNameNodes=" + numNameNodes + ", numDataNodes=" + builder.numDataNodes); - nameNodes = new NameNodeInfo[numNameNodes]; + this.storagesPerDatanode = builder.storagesPerDatanode; // Duplicate the storageType setting for each DN. @@ -515,7 +517,7 @@ public class MiniDFSCluster { } private Configuration conf; - private NameNodeInfo[] nameNodes; + private Multimap namenodes = ArrayListMultimap.create(); protected int numDataNodes; protected final List dataNodes = new ArrayList(); @@ -539,10 +541,10 @@ public class MiniDFSCluster { * Stores the information related to a namenode in the cluster */ public static class NameNodeInfo { - final NameNode nameNode; - final Configuration conf; - final String nameserviceId; - final String nnId; + public NameNode nameNode; + Configuration conf; + String nameserviceId; + String nnId; StartupOption startOpt; NameNodeInfo(NameNode nn, String nameserviceId, String nnId, StartupOption startOpt, Configuration conf) { @@ -563,7 +565,6 @@ public class MiniDFSCluster { * without a name node (ie when the name node is started elsewhere). */ public MiniDFSCluster() { - nameNodes = new NameNodeInfo[0]; // No namenode in the cluster storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE; synchronized (MiniDFSCluster.class) { instanceId = instanceCount++; @@ -740,7 +741,6 @@ public class MiniDFSCluster { StartupOption operation, String[] racks, String hosts[], long[] simulatedCapacities) throws IOException { - this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster this.storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE; initMiniDFSCluster(conf, numDataNodes, null, format, manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs, @@ -814,7 +814,7 @@ public class MiniDFSCluster { createNameNodesAndSetConf( nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs, enableManagedDfsDirsRedundancy, - format, startOpt, clusterId, conf); + format, startOpt, clusterId); } catch (IOException ioe) { LOG.error("IOE creating namenodes. Permissions dump:\n" + createPermissionsDiagnosisString(data_dir), ioe); @@ -871,7 +871,127 @@ public class MiniDFSCluster { private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy, boolean format, + StartupOption operation, String clusterId) throws IOException { + // do the basic namenode configuration + configureNameNodes(nnTopology, federation, conf); + + int nnCounter = 0; + int nsCounter = 0; + // configure each NS independently + for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) { + configureNameService(nameservice, nsCounter++, manageNameDfsSharedDirs, + manageNameDfsDirs, enableManagedDfsDirsRedundancy, + format, operation, clusterId, nnCounter); + nnCounter += nameservice.getNNs().size(); + } + } + + /** + * Do the rest of the NN configuration for things like shared edits, + * as well as directory formatting, etc. for a single nameservice + * @param nnCounter the count of the number of namenodes already configured/started. Also, + * acts as the index to the next NN to start (since indicies start at 0). + * @throws IOException + */ + private void configureNameService(MiniDFSNNTopology.NSConf nameservice, int nsCounter, + boolean manageNameDfsSharedDirs, boolean manageNameDfsDirs, boolean + enableManagedDfsDirsRedundancy, boolean format, StartupOption operation, String clusterId, + final int nnCounter) throws IOException{ + String nsId = nameservice.getId(); + String lastDefaultFileSystem = null; + + // If HA is enabled on this nameservice, enumerate all the namenodes + // in the configuration. Also need to set a shared edits dir + int numNNs = nameservice.getNNs().size(); + if (numNNs > 1 && manageNameDfsSharedDirs) { + URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter + numNNs - 1); + conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString()); + // Clean out the shared edits dir completely, including all subdirectories. + FileUtil.fullyDelete(new File(sharedEditsUri)); + } + + // Now format first NN and copy the storage directory from that node to the others. + int nnIndex = nnCounter; + Collection prevNNDirs = null; + for (NNConf nn : nameservice.getNNs()) { + initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs, + manageNameDfsDirs, nnIndex); + Collection namespaceDirs = FSNamesystem.getNamespaceDirs(conf); + if (format) { + // delete the existing namespaces + for (URI nameDirUri : namespaceDirs) { + File nameDir = new File(nameDirUri); + if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) { + throw new IOException("Could not fully delete " + nameDir); + } + } + + // delete the checkpoint directories, if they exist + Collection checkpointDirs = Util.stringCollectionAsURIs(conf + .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY)); + for (URI checkpointDirUri : checkpointDirs) { + File checkpointDir = new File(checkpointDirUri); + if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) { + throw new IOException("Could not fully delete " + checkpointDir); + } + } + } + + boolean formatThisOne = format; + // if we are looking at not the first NN + if (nnIndex++ > nnCounter && format) { + // Don't format the second, third, etc NN in an HA setup - that + // would result in it having a different clusterID, + // block pool ID, etc. Instead, copy the name dirs + // from the previous one. + formatThisOne = false; + assert (null != prevNNDirs); + copyNameDirs(prevNNDirs, namespaceDirs, conf); + } + + if (formatThisOne) { + // Allow overriding clusterID for specific NNs to test + // misconfiguration. + if (nn.getClusterId() == null) { + StartupOption.FORMAT.setClusterId(clusterId); + } else { + StartupOption.FORMAT.setClusterId(nn.getClusterId()); + } + DFSTestUtil.formatNameNode(conf); + } + prevNNDirs = namespaceDirs; + } + + // create all the namenodes in the namespace + nnIndex = nnCounter; + for (NNConf nn : nameservice.getNNs()) { + initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs, + enableManagedDfsDirsRedundancy, nnIndex++); + NameNodeInfo info = createNameNode(conf, false, operation, + clusterId, nsId, nn.getNnId()); + + // Record the last namenode uri + if (info != null && info.conf != null) { + lastDefaultFileSystem = + info.conf.get(FS_DEFAULT_NAME_KEY); + } + } + if (!federation && lastDefaultFileSystem != null) { + // Set the default file system to the actual bind address of NN. + conf.set(FS_DEFAULT_NAME_KEY, lastDefaultFileSystem); + } + } + + /** + * Do the basic NN configuration for the topology. Does not configure things like the shared + * edits directories + * @param nnTopology + * @param federation + * @param conf + * @throws IOException + */ + public static void configureNameNodes(MiniDFSNNTopology nnTopology, boolean federation, Configuration conf) throws IOException { Preconditions.checkArgument(nnTopology.countNameNodes() > 0, "empty NN topology: no namenodes specified!"); @@ -884,22 +1004,21 @@ public class MiniDFSCluster { // NN is started. conf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:" + onlyNN.getIpcPort()); } - + List allNsIds = Lists.newArrayList(); for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) { if (nameservice.getId() != null) { allNsIds.add(nameservice.getId()); } } + if (!allNsIds.isEmpty()) { conf.set(DFS_NAMESERVICES, Joiner.on(",").join(allNsIds)); } - - int nnCounter = 0; + for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) { String nsId = nameservice.getId(); - String lastDefaultFileSystem = null; - + Preconditions.checkArgument( !federation || nsId != null, "if there is more than one NS, they must have names"); @@ -918,85 +1037,10 @@ public class MiniDFSCluster { // If HA is enabled on this nameservice, enumerate all the namenodes // in the configuration. Also need to set a shared edits dir if (nnIds.size() > 1) { - conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()), - Joiner.on(",").join(nnIds)); - if (manageNameDfsSharedDirs) { - URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1); - conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString()); - // Clean out the shared edits dir completely, including all subdirectories. - FileUtil.fullyDelete(new File(sharedEditsUri)); - } - } - - // Now format first NN and copy the storage directory from that node to the others. - int i = 0; - Collection prevNNDirs = null; - int nnCounterForFormat = nnCounter; - for (NNConf nn : nameservice.getNNs()) { - initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, manageNameDfsDirs, - nnCounterForFormat); - Collection namespaceDirs = FSNamesystem.getNamespaceDirs(conf); - if (format) { - for (URI nameDirUri : namespaceDirs) { - File nameDir = new File(nameDirUri); - if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) { - throw new IOException("Could not fully delete " + nameDir); - } - } - Collection checkpointDirs = Util.stringCollectionAsURIs(conf - .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY)); - for (URI checkpointDirUri : checkpointDirs) { - File checkpointDir = new File(checkpointDirUri); - if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) { - throw new IOException("Could not fully delete " + checkpointDir); - } - } - } - - boolean formatThisOne = format; - if (format && i++ > 0) { - // Don't format the second NN in an HA setup - that - // would result in it having a different clusterID, - // block pool ID, etc. Instead, copy the name dirs - // from the first one. - formatThisOne = false; - assert (null != prevNNDirs); - copyNameDirs(prevNNDirs, namespaceDirs, conf); - } - - nnCounterForFormat++; - if (formatThisOne) { - // Allow overriding clusterID for specific NNs to test - // misconfiguration. - if (nn.getClusterId() == null) { - StartupOption.FORMAT.setClusterId(clusterId); - } else { - StartupOption.FORMAT.setClusterId(nn.getClusterId()); - } - DFSTestUtil.formatNameNode(conf); - } - prevNNDirs = namespaceDirs; - } - - // Start all Namenodes - for (NNConf nn : nameservice.getNNs()) { - initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, - enableManagedDfsDirsRedundancy, nnCounter); - createNameNode(nnCounter, conf, numDataNodes, false, operation, - clusterId, nsId, nn.getNnId()); - // Record the last namenode uri - if (nameNodes[nnCounter] != null && nameNodes[nnCounter].conf != null) { - lastDefaultFileSystem = - nameNodes[nnCounter].conf.get(FS_DEFAULT_NAME_KEY); - } - nnCounter++; - } - if (!federation && lastDefaultFileSystem != null) { - // Set the default file system to the actual bind address of NN. - conf.set(FS_DEFAULT_NAME_KEY, lastDefaultFileSystem); + conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()), Joiner + .on(",").join(nnIds)); } } - } public URI getSharedEditsDir(int minNN, int maxNN) throws IOException { @@ -1010,39 +1054,92 @@ public class MiniDFSCluster { } public NameNodeInfo[] getNameNodeInfos() { - return this.nameNodes; + return this.namenodes.values().toArray(new NameNodeInfo[0]); } - private void initNameNodeConf(Configuration conf, - String nameserviceId, String nnId, - boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, - int nnIndex) throws IOException { + /** + * @param nsIndex index of the namespace id to check + * @return all the namenodes bound to the given namespace index + */ + public NameNodeInfo[] getNameNodeInfos(int nsIndex) { + int i = 0; + for (String ns : this.namenodes.keys()) { + if (i++ == nsIndex) { + return this.namenodes.get(ns).toArray(new NameNodeInfo[0]); + } + } + return null; + } + + /** + * @param nameservice id of nameservice to read + * @return all the namenodes bound to the given namespace index + */ + public NameNodeInfo[] getNameNodeInfos(String nameservice) { + for (String ns : this.namenodes.keys()) { + if (nameservice.equals(ns)) { + return this.namenodes.get(ns).toArray(new NameNodeInfo[0]); + } + } + return null; + } + + + private void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId, + boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, int nnIndex) + throws IOException { if (nameserviceId != null) { conf.set(DFS_NAMESERVICE_ID, nameserviceId); } if (nnId != null) { conf.set(DFS_HA_NAMENODE_ID_KEY, nnId); } - if (manageNameDfsDirs) { if (enableManagedDfsDirsRedundancy) { - conf.set(DFS_NAMENODE_NAME_DIR_KEY, - fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+ - fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2)))); - conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, - fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+ - fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2)))); + File[] files = getNameNodeDirectory(nsIndex, nnIndex); + conf.set(DFS_NAMENODE_NAME_DIR_KEY, fileAsURI(files[0]) + "," + fileAsURI(files[1])); + files = getCheckpointDirectory(nsIndex, nnIndex); + conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, fileAsURI(files[0]) + "," + fileAsURI(files[1])); } else { - conf.set(DFS_NAMENODE_NAME_DIR_KEY, - fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1))). - toString()); - conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, - fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1))). - toString()); + File[] files = getNameNodeDirectory(nsIndex, nnIndex); + conf.set(DFS_NAMENODE_NAME_DIR_KEY, fileAsURI(files[0]).toString()); + files = getCheckpointDirectory(nsIndex, nnIndex); + conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, fileAsURI(files[0]).toString()); } } } + private File[] getNameNodeDirectory(int nameserviceIndex, int nnIndex) { + return getNameNodeDirectory(base_dir, nameserviceIndex, nnIndex); + } + + public static File[] getNameNodeDirectory(String base_dir, int nsIndex, int nnIndex) { + return getNameNodeDirectory(new File(base_dir), nsIndex, nnIndex); + } + + public static File[] getNameNodeDirectory(File base_dir, int nsIndex, int nnIndex) { + File[] files = new File[2]; + files[0] = new File(base_dir, "name-" + nsIndex + "-" + (2 * nnIndex + 1)); + files[1] = new File(base_dir, "name-" + nsIndex + "-" + (2 * nnIndex + 2)); + return files; + } + + public File[] getCheckpointDirectory(int nsIndex, int nnIndex) { + return getCheckpointDirectory(base_dir, nsIndex, nnIndex); + } + + public static File[] getCheckpointDirectory(String base_dir, int nsIndex, int nnIndex) { + return getCheckpointDirectory(new File(base_dir), nsIndex, nnIndex); + } + + public static File[] getCheckpointDirectory(File base_dir, int nsIndex, int nnIndex) { + File[] files = new File[2]; + files[0] = new File(base_dir, "namesecondary-" + nsIndex + "-" + (2 * nnIndex + 1)); + files[1] = new File(base_dir, "namesecondary-" + nsIndex + "-" + (2 * nnIndex + 2)); + return files; + } + + public static void copyNameDirs(Collection srcDirs, Collection dstDirs, Configuration dstConf) throws IOException { URI srcDir = Lists.newArrayList(srcDirs).get(0); @@ -1094,12 +1191,9 @@ public class MiniDFSCluster { new String[] {} : new String[] {operation.getName()}; return args; } - - private void createNameNode(int nnIndex, Configuration conf, - int numDataNodes, boolean format, StartupOption operation, - String clusterId, String nameserviceId, - String nnId) - throws IOException { + + private NameNodeInfo createNameNode(Configuration conf, boolean format, StartupOption operation, + String clusterId, String nameserviceId, String nnId) throws IOException { // Format and clean out DataNode directories if (format) { DFSTestUtil.formatNameNode(conf); @@ -1113,7 +1207,7 @@ public class MiniDFSCluster { String[] args = createArgs(operation); NameNode nn = NameNode.createNameNode(args, conf); if (operation == StartupOption.RECOVER) { - return; + return null; } // After the NN has started, set back the bound ports into @@ -1131,14 +1225,17 @@ public class MiniDFSCluster { DFSUtil.setGenericConf(conf, nameserviceId, nnId, DFS_NAMENODE_HTTP_ADDRESS_KEY); - nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, + NameNodeInfo info = new NameNodeInfo(nn, nameserviceId, nnId, operation, new Configuration(conf)); + namenodes.put(nameserviceId, info); + // Restore the default fs name if (originalDefaultFs == null) { conf.set(FS_DEFAULT_NAME_KEY, ""); } else { conf.set(FS_DEFAULT_NAME_KEY, originalDefaultFs); } + return info; } /** @@ -1154,7 +1251,7 @@ public class MiniDFSCluster { */ public URI getURI(int nnIndex) { String hostPort = - nameNodes[nnIndex].nameNode.getNameNodeAddressHostPortString(); + getNN(nnIndex).nameNode.getNameNodeAddressHostPortString(); URI uri = null; try { uri = new URI("hdfs://" + hostPort); @@ -1172,9 +1269,21 @@ public class MiniDFSCluster { * @return Configuration of for the given namenode */ public Configuration getConfiguration(int nnIndex) { - return nameNodes[nnIndex].conf; + return getNN(nnIndex).conf; } + private NameNodeInfo getNN(int nnIndex) { + int count = 0; + for (NameNodeInfo nn : namenodes.values()) { + if (count == nnIndex) { + return nn; + } + count++; + } + return null; + } + + /** * wait for the given namenode to get out of safemode. */ @@ -1593,7 +1702,7 @@ public class MiniDFSCluster { * @throws Exception */ public void finalizeCluster(int nnIndex, Configuration conf) throws Exception { - finalizeNamenode(nameNodes[nnIndex].nameNode, nameNodes[nnIndex].conf); + finalizeNamenode(getNN(nnIndex).nameNode, getNN(nnIndex).conf); } /** @@ -1604,7 +1713,7 @@ public class MiniDFSCluster { * @throws IllegalStateException if the Namenode is not running. */ public void finalizeCluster(Configuration conf) throws Exception { - for (NameNodeInfo nnInfo : nameNodes) { + for (NameNodeInfo nnInfo : namenodes.values()) { if (nnInfo == null) { throw new IllegalStateException("Attempting to finalize " + "Namenode but it is not running"); @@ -1612,9 +1721,9 @@ public class MiniDFSCluster { finalizeNamenode(nnInfo.nameNode, nnInfo.conf); } } - + public int getNumNameNodes() { - return nameNodes.length; + return namenodes.size(); } /** @@ -1644,7 +1753,7 @@ public class MiniDFSCluster { * Gets the NameNode for the index. May be null. */ public NameNode getNameNode(int nnIndex) { - return nameNodes[nnIndex].nameNode; + return getNN(nnIndex).nameNode; } /** @@ -1653,11 +1762,11 @@ public class MiniDFSCluster { */ public FSNamesystem getNamesystem() { checkSingleNameNode(); - return NameNodeAdapter.getNamesystem(nameNodes[0].nameNode); + return NameNodeAdapter.getNamesystem(getNN(0).nameNode); } - + public FSNamesystem getNamesystem(int nnIndex) { - return NameNodeAdapter.getNamesystem(nameNodes[nnIndex].nameNode); + return NameNodeAdapter.getNamesystem(getNN(nnIndex).nameNode); } /** @@ -1697,14 +1806,14 @@ public class MiniDFSCluster { * caller supplied port is not necessarily the actual port used. */ public int getNameNodePort(int nnIndex) { - return nameNodes[nnIndex].nameNode.getNameNodeAddress().getPort(); + return getNN(nnIndex).nameNode.getNameNodeAddress().getPort(); } /** * @return the service rpc port used by the NameNode at the given index. */ public int getNameNodeServicePort(int nnIndex) { - return nameNodes[nnIndex].nameNode.getServiceRpcAddress().getPort(); + return getNN(nnIndex).nameNode.getServiceRpcAddress().getPort(); } /** @@ -1745,7 +1854,7 @@ public class MiniDFSCluster { fileSystems.clear(); } shutdownDataNodes(); - for (NameNodeInfo nnInfo : nameNodes) { + for (NameNodeInfo nnInfo : namenodes.values()) { if (nnInfo == null) continue; NameNode nameNode = nnInfo.nameNode; if (nameNode != null) { @@ -1781,7 +1890,7 @@ public class MiniDFSCluster { * Shutdown all the namenodes. */ public synchronized void shutdownNameNodes() { - for (int i = 0; i < nameNodes.length; i++) { + for (int i = 0; i < namenodes.size(); i++) { shutdownNameNode(i); } } @@ -1790,13 +1899,15 @@ public class MiniDFSCluster { * Shutdown the namenode at a given index. */ public synchronized void shutdownNameNode(int nnIndex) { - NameNode nn = nameNodes[nnIndex].nameNode; + NameNodeInfo info = getNN(nnIndex); + NameNode nn = info.nameNode; if (nn != null) { LOG.info("Shutting down the namenode"); nn.stop(); nn.join(); - Configuration conf = nameNodes[nnIndex].conf; - nameNodes[nnIndex] = new NameNodeInfo(null, null, null, null, conf); + info.nnId = null; + info.nameNode = null; + info.nameserviceId = null; } } @@ -1804,7 +1915,7 @@ public class MiniDFSCluster { * Restart all namenodes. */ public synchronized void restartNameNodes() throws IOException { - for (int i = 0; i < nameNodes.length; i++) { + for (int i = 0; i < namenodes.size(); i++) { restartNameNode(i, false); } waitActive(); @@ -1840,19 +1951,19 @@ public class MiniDFSCluster { */ public synchronized void restartNameNode(int nnIndex, boolean waitActive, String... args) throws IOException { - String nameserviceId = nameNodes[nnIndex].nameserviceId; - String nnId = nameNodes[nnIndex].nnId; - StartupOption startOpt = nameNodes[nnIndex].startOpt; - Configuration conf = nameNodes[nnIndex].conf; + NameNodeInfo info = getNN(nnIndex); + StartupOption startOpt = info.startOpt; + shutdownNameNode(nnIndex); if (args.length != 0) { startOpt = null; } else { args = createArgs(startOpt); } - NameNode nn = NameNode.createNameNode(args, conf); - nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, startOpt, - conf); + + NameNode nn = NameNode.createNameNode(args, info.conf); + info.nameNode = nn; + info.setStartOpt(startOpt); if (waitActive) { waitClusterUp(); LOG.info("Restarted the namenode"); @@ -2124,7 +2235,7 @@ public class MiniDFSCluster { * or if waiting for safe mode is disabled. */ public boolean isNameNodeUp(int nnIndex) { - NameNode nameNode = nameNodes[nnIndex].nameNode; + NameNode nameNode = getNN(nnIndex).nameNode; if (nameNode == null) { return false; } @@ -2142,7 +2253,7 @@ public class MiniDFSCluster { * Returns true if all the NameNodes are running and is out of Safe Mode. */ public boolean isClusterUp() { - for (int index = 0; index < nameNodes.length; index++) { + for (int index = 0; index < namenodes.size(); index++) { if (!isNameNodeUp(index)) { return false; } @@ -2172,15 +2283,13 @@ public class MiniDFSCluster { checkSingleNameNode(); return getFileSystem(0); } - + /** * Get a client handle to the DFS cluster for the namenode at given index. */ public DistributedFileSystem getFileSystem(int nnIndex) throws IOException { - DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get( - getURI(nnIndex), nameNodes[nnIndex].conf); - fileSystems.add(dfs); - return dfs; + return (DistributedFileSystem) addFileSystem(FileSystem.get(getURI(nnIndex), + getNN(nnIndex).conf)); } /** @@ -2188,17 +2297,20 @@ public class MiniDFSCluster { * This simulating different threads working on different FileSystem instances. */ public FileSystem getNewFileSystemInstance(int nnIndex) throws IOException { - FileSystem dfs = FileSystem.newInstance(getURI(nnIndex), nameNodes[nnIndex].conf); - fileSystems.add(dfs); - return dfs; + return addFileSystem(FileSystem.newInstance(getURI(nnIndex), getNN(nnIndex).conf)); } - + + private T addFileSystem(T fs) { + fileSystems.add(fs); + return fs; + } + /** * @return a http URL */ public String getHttpUri(int nnIndex) { return "http://" - + nameNodes[nnIndex].conf + + getNN(nnIndex).conf .get(DFS_NAMENODE_HTTP_ADDRESS_KEY); } @@ -2206,14 +2318,14 @@ public class MiniDFSCluster { * Get the directories where the namenode stores its image. */ public Collection getNameDirs(int nnIndex) { - return FSNamesystem.getNamespaceDirs(nameNodes[nnIndex].conf); + return FSNamesystem.getNamespaceDirs(getNN(nnIndex).conf); } /** * Get the directories where the namenode stores its edits. */ public Collection getNameEditsDirs(int nnIndex) throws IOException { - return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf); + return FSNamesystem.getNamespaceEditsDirs(getNN(nnIndex).conf); } public void transitionToActive(int nnIndex) throws IOException, @@ -2254,11 +2366,12 @@ public class MiniDFSCluster { /** Wait until the given namenode gets registration from all the datanodes */ public void waitActive(int nnIndex) throws IOException { - if (nameNodes.length == 0 || nameNodes[nnIndex] == null - || nameNodes[nnIndex].nameNode == null) { + if (namenodes.size() == 0 || getNN(nnIndex) == null || getNN(nnIndex).nameNode == null) { return; } - InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress(); + + NameNodeInfo info = getNN(nnIndex); + InetSocketAddress addr = info.nameNode.getServiceRpcAddress(); assert addr.getPort() != 0; DFSClient client = new DFSClient(addr, conf); @@ -2278,7 +2391,7 @@ public class MiniDFSCluster { * Wait until the cluster is active and running. */ public void waitActive() throws IOException { - for (int index = 0; index < nameNodes.length; index++) { + for (int index = 0; index < namenodes.size(); index++) { int failedCount = 0; while (true) { try { @@ -2298,7 +2411,14 @@ public class MiniDFSCluster { } LOG.info("Cluster is active"); } - + + public void printNNs() { + for (int i = 0; i < namenodes.size(); i++) { + LOG.info("Have namenode " + i + ", info:" + getNN(i)); + LOG.info(" has namenode: " + getNN(i).nameNode); + } + } + private synchronized boolean shouldWait(DatanodeInfo[] dnInfo, InetSocketAddress addr) { // If a datanode failed to start, then do not wait @@ -2696,7 +2816,7 @@ public class MiniDFSCluster { * namenode */ private void checkSingleNameNode() { - if (nameNodes.length != 1) { + if (namenodes.size() != 1) { throw new IllegalArgumentException("Namenode index is needed"); } } @@ -2712,13 +2832,9 @@ public class MiniDFSCluster { if(!federation) throw new IOException("cannot add namenode to non-federated cluster"); - int nnIndex = nameNodes.length; - int numNameNodes = nameNodes.length + 1; - NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes]; - System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length); - nameNodes = newlist; - String nameserviceId = NAMESERVICE_ID_PREFIX + (nnIndex + 1); - + int nameServiceIndex = namenodes.keys().size(); + String nameserviceId = NAMESERVICE_ID_PREFIX + (namenodes.keys().size() + 1); + String nameserviceIds = conf.get(DFS_NAMESERVICES); nameserviceIds += "," + nameserviceId; conf.set(DFS_NAMESERVICES, nameserviceIds); @@ -2726,9 +2842,11 @@ public class MiniDFSCluster { String nnId = null; initNameNodeAddress(conf, nameserviceId, new NNConf(nnId).setIpcPort(namenodePort)); - initNameNodeConf(conf, nameserviceId, nnId, true, true, nnIndex); - createNameNode(nnIndex, conf, numDataNodes, true, null, null, - nameserviceId, nnId); + // figure out the current number of NNs + NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId); + int nnIndex = infos == null ? 0 : infos.length; + initNameNodeConf(conf, nameserviceId, nameServiceIndex, nnId, true, true, nnIndex); + NameNodeInfo info = createNameNode(conf, true, null, null, nameserviceId, nnId); // Refresh datanodes with the newly started namenode for (DataNodeProperties dn : dataNodes) { @@ -2738,7 +2856,7 @@ public class MiniDFSCluster { // Wait for new namenode to get registrations from all the datanodes waitActive(nnIndex); - return nameNodes[nnIndex].nameNode; + return info.nameNode; } protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile, http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java index a99e9c3..b9786a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java @@ -56,10 +56,20 @@ public class MiniDFSNNTopology { * Set up an HA topology with a single HA nameservice. */ public static MiniDFSNNTopology simpleHATopology() { - return new MiniDFSNNTopology() - .addNameservice(new MiniDFSNNTopology.NSConf("minidfs-ns") - .addNN(new MiniDFSNNTopology.NNConf("nn1")) - .addNN(new MiniDFSNNTopology.NNConf("nn2"))); + return simpleHATopology(2); + } + + /** + * Set up an HA topology with a single HA nameservice. + * @param nnCount of namenodes to use with the nameservice + */ + public static MiniDFSNNTopology simpleHATopology(int nnCount) { + MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf("minidfs-ns"); + for (int i = 1; i <= nnCount; i++) { + nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i)); + } + MiniDFSNNTopology topology = new MiniDFSNNTopology().addNameservice(nameservice); + return topology; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java index ad907f6..fae1024 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java @@ -303,12 +303,12 @@ public class TestDFSUpgradeFromImage { unpackStorage(HADOOP22_IMAGE, HADOOP_DFS_DIR_TXT); // Overwrite the md5 stored in the VERSION files - File baseDir = new File(MiniDFSCluster.getBaseDirectory()); + File[] nnDirs = MiniDFSCluster.getNameNodeDirectory(MiniDFSCluster.getBaseDirectory(), 0, 0); FSImageTestUtil.corruptVersionFile( - new File(baseDir, "name1/current/VERSION"), + new File(nnDirs[0], "current/VERSION"), "imageMD5Digest", "22222222222222222222222222222222"); FSImageTestUtil.corruptVersionFile( - new File(baseDir, "name2/current/VERSION"), + new File(nnDirs[1], "current/VERSION"), "imageMD5Digest", "22222222222222222222222222222222"); // Attach our own log appender so we can verify output http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java index c4c890f..b50b1cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java @@ -45,6 +45,7 @@ import org.apache.hadoop.io.IOUtils; import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertTrue; /** * This class tests rolling upgrade. @@ -66,7 +67,7 @@ public class TestRollingUpgrade { */ @Test public void testDFSAdminRollingUpgradeCommands() throws Exception { - // start a cluster + // start a cluster final Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = null; try { @@ -97,7 +98,7 @@ public class TestRollingUpgrade { runCmd(dfsadmin, true, "-rollingUpgrade", "query"); dfs.mkdirs(bar); - + //finalize rolling upgrade runCmd(dfsadmin, true, "-rollingUpgrade", "finalize"); @@ -143,7 +144,7 @@ public class TestRollingUpgrade { String nnDirPrefix = MiniDFSCluster.getBaseDirectory() + "/nn/"; final File nn1Dir = new File(nnDirPrefix + "image1"); final File nn2Dir = new File(nnDirPrefix + "image2"); - + LOG.info("nn1Dir=" + nn1Dir); LOG.info("nn2Dir=" + nn2Dir); @@ -186,9 +187,9 @@ public class TestRollingUpgrade { final RollingUpgradeInfo info1; { - final DistributedFileSystem dfs = cluster.getFileSystem(); + final DistributedFileSystem dfs = cluster.getFileSystem(); dfs.mkdirs(foo); - + //start rolling upgrade dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); info1 = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE); @@ -197,7 +198,7 @@ public class TestRollingUpgrade { //query rolling upgrade Assert.assertEquals(info1, dfs.rollingUpgrade(RollingUpgradeAction.QUERY)); - + dfs.mkdirs(bar); cluster.shutdown(); } @@ -209,8 +210,8 @@ public class TestRollingUpgrade { .format(false) .manageNameDfsDirs(false) .build(); - final DistributedFileSystem dfs2 = cluster2.getFileSystem(); - + final DistributedFileSystem dfs2 = cluster2.getFileSystem(); + // Check that cluster2 sees the edits made on cluster1 Assert.assertTrue(dfs2.exists(foo)); Assert.assertTrue(dfs2.exists(bar)); @@ -260,7 +261,7 @@ public class TestRollingUpgrade { @Test public void testRollback() throws IOException { - // start a cluster + // start a cluster final Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = null; try { @@ -305,7 +306,7 @@ public class TestRollingUpgrade { if(cluster != null) cluster.shutdown(); } } - + private static void startRollingUpgrade(Path foo, Path bar, Path file, byte[] data, MiniDFSCluster cluster) throws IOException { @@ -327,7 +328,7 @@ public class TestRollingUpgrade { TestFileTruncate.checkBlockRecovery(file, dfs); AppendTestUtil.checkFullFile(dfs, file, newLength, data); } - + private static void rollbackRollingUpgrade(Path foo, Path bar, Path file, byte[] data, MiniDFSCluster cluster) throws IOException { @@ -372,22 +373,33 @@ public class TestRollingUpgrade { } } - @Test (timeout = 300000) + @Test(timeout = 300000) public void testFinalize() throws Exception { + testFinalize(2); + } + + @Test(timeout = 300000) + public void testFinalizeWithMultipleNN() throws Exception { + testFinalize(3); + } + + private void testFinalize(int nnCount) throws Exception { final Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = null; final Path foo = new Path("/foo"); final Path bar = new Path("/bar"); try { - cluster = new MiniQJMHACluster.Builder(conf).build(); + cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build(); MiniDFSCluster dfsCluster = cluster.getDfsCluster(); dfsCluster.waitActive(); - // let NN1 tail editlog every 1s - dfsCluster.getConfiguration(1).setInt( - DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); - dfsCluster.restartNameNode(1); + // let other NN tail editlog every 1s + for(int i=1; i < nnCount; i++) { + dfsCluster.getConfiguration(i).setInt( + DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + } + dfsCluster.restartNameNodes(); dfsCluster.transitionToActive(0); DistributedFileSystem dfs = dfsCluster.getFileSystem(0); @@ -425,17 +437,29 @@ public class TestRollingUpgrade { @Test (timeout = 300000) public void testQuery() throws Exception { + testQuery(2); + } + + @Test (timeout = 300000) + public void testQueryWithMultipleNN() throws Exception { + testQuery(3); + } + + private void testQuery(int nnCount) throws Exception{ final Configuration conf = new Configuration(); MiniQJMHACluster cluster = null; try { - cluster = new MiniQJMHACluster.Builder(conf).build(); + cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build(); MiniDFSCluster dfsCluster = cluster.getDfsCluster(); dfsCluster.waitActive(); dfsCluster.transitionToActive(0); DistributedFileSystem dfs = dfsCluster.getFileSystem(0); - dfsCluster.shutdownNameNode(1); + // shutdown other NNs + for (int i = 1; i < nnCount; i++) { + dfsCluster.shutdownNameNode(i); + } // start rolling upgrade RollingUpgradeInfo info = dfs @@ -445,13 +469,16 @@ public class TestRollingUpgrade { info = dfs.rollingUpgrade(RollingUpgradeAction.QUERY); Assert.assertFalse(info.createdRollbackImages()); - dfsCluster.restartNameNode(1); - + // restart other NNs + for (int i = 1; i < nnCount; i++) { + dfsCluster.restartNameNode(i); + } + // check that one of the other NNs has created the rollback image and uploaded it queryForPreparation(dfs); // The NN should have a copy of the fsimage in case of rollbacks. Assert.assertTrue(dfsCluster.getNamesystem(0).getFSImage() - .hasRollbackFSImage()); + .hasRollbackFSImage()); } finally { if (cluster != null) { cluster.shutdown(); @@ -487,6 +514,15 @@ public class TestRollingUpgrade { @Test(timeout = 300000) public void testCheckpoint() throws IOException, InterruptedException { + testCheckpoint(2); + } + + @Test(timeout = 300000) + public void testCheckpointWithMultipleNN() throws IOException, InterruptedException { + testCheckpoint(3); + } + + public void testCheckpoint(int nnCount) throws IOException, InterruptedException { final Configuration conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1); @@ -495,7 +531,7 @@ public class TestRollingUpgrade { final Path foo = new Path("/foo"); try { - cluster = new MiniQJMHACluster.Builder(conf).build(); + cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build(); MiniDFSCluster dfsCluster = cluster.getDfsCluster(); dfsCluster.waitActive(); @@ -513,16 +549,9 @@ public class TestRollingUpgrade { long txid = dfs.rollEdits(); Assert.assertTrue(txid > 0); - int retries = 0; - while (++retries < 5) { - NNStorage storage = dfsCluster.getNamesystem(1).getFSImage() - .getStorage(); - if (storage.getFsImageName(txid - 1) != null) { - return; - } - Thread.sleep(1000); + for(int i=1; i< nnCount; i++) { + verifyNNCheckpoint(dfsCluster, txid, i); } - Assert.fail("new checkpoint does not exist"); } finally { if (cluster != null) { @@ -531,6 +560,22 @@ public class TestRollingUpgrade { } } + /** + * Verify that the namenode at the given index has an FSImage with a TxId up to txid-1 + */ + private void verifyNNCheckpoint(MiniDFSCluster dfsCluster, long txid, int nnIndex) throws InterruptedException { + int retries = 0; + while (++retries < 5) { + NNStorage storage = dfsCluster.getNamesystem(nnIndex).getFSImage() + .getStorage(); + if (storage.getFsImageName(txid - 1) != null) { + return; + } + Thread.sleep(1000); + } + Assert.fail("new checkpoint does not exist"); + } + static void queryForPreparation(DistributedFileSystem dfs) throws IOException, InterruptedException { RollingUpgradeInfo info; http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java index ef4c559..470a08b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java @@ -17,43 +17,39 @@ */ package org.apache.hadoop.hdfs.qjournal; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; - -import java.io.IOException; -import java.net.BindException; -import java.net.URI; -import java.util.Random; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; + +import java.io.IOException; +import java.net.BindException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; public class MiniQJMHACluster { private MiniDFSCluster cluster; private MiniJournalCluster journalCluster; private final Configuration conf; private static final Log LOG = LogFactory.getLog(MiniQJMHACluster.class); - + public static final String NAMESERVICE = "ns1"; - private static final String NN1 = "nn1"; - private static final String NN2 = "nn2"; private static final Random RANDOM = new Random(); private int basePort = 10000; public static class Builder { private final Configuration conf; private StartupOption startOpt = null; + private int numNNs = 2; private final MiniDFSCluster.Builder dfsBuilder; - + public Builder(Configuration conf) { this.conf = conf; // most QJMHACluster tests don't need DataNodes, so we'll make @@ -64,7 +60,7 @@ public class MiniQJMHACluster { public MiniDFSCluster.Builder getDfsBuilder() { return dfsBuilder; } - + public MiniQJMHACluster build() throws IOException { return new MiniQJMHACluster(this); } @@ -72,15 +68,25 @@ public class MiniQJMHACluster { public void startupOption(StartupOption startOpt) { this.startOpt = startOpt; } + + public Builder setNumNameNodes(int nns) { + this.numNNs = nns; + return this; + } + } + + public static MiniDFSNNTopology createDefaultTopology(int nns, int startingPort) { + MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf(NAMESERVICE); + for (int i = 0; i < nns; i++) { + nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i).setIpcPort(startingPort++) + .setHttpPort(startingPort++)); + } + + return new MiniDFSNNTopology().addNameservice(nameservice); } - + public static MiniDFSNNTopology createDefaultTopology(int basePort) { - return new MiniDFSNNTopology() - .addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN( - new MiniDFSNNTopology.NNConf("nn1").setIpcPort(basePort) - .setHttpPort(basePort + 1)).addNN( - new MiniDFSNNTopology.NNConf("nn2").setIpcPort(basePort + 2) - .setHttpPort(basePort + 3))); + return createDefaultTopology(2, basePort); } private MiniQJMHACluster(Builder builder) throws IOException { @@ -94,10 +100,10 @@ public class MiniQJMHACluster { .build(); URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE); - // start cluster with 2 NameNodes - MiniDFSNNTopology topology = createDefaultTopology(basePort); + // start cluster with specified NameNodes + MiniDFSNNTopology topology = createDefaultTopology(builder.numNNs, basePort); - initHAConf(journalURI, builder.conf); + initHAConf(journalURI, builder.conf, builder.numNNs); // First start up the NNs just to format the namespace. The MinIDFSCluster // has no way to just format the NameNodes without also starting them. @@ -110,8 +116,9 @@ public class MiniQJMHACluster { Configuration confNN0 = cluster.getConfiguration(0); NameNode.initializeSharedEdits(confNN0, true); - cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt); - cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt); + for (MiniDFSCluster.NameNodeInfo nn : cluster.getNameNodeInfos()) { + nn.setStartOpt(builder.startOpt); + } // restart the cluster cluster.restartNameNodes(); @@ -123,31 +130,28 @@ public class MiniQJMHACluster { } } } - - private Configuration initHAConf(URI journalURI, Configuration conf) { + + private Configuration initHAConf(URI journalURI, Configuration conf, int numNNs) { conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, journalURI.toString()); - - String address1 = "127.0.0.1:" + basePort; - String address2 = "127.0.0.1:" + (basePort + 2); - conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, - NAMESERVICE, NN1), address1); - conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, - NAMESERVICE, NN2), address2); - conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE); - conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE), - NN1 + "," + NN2); - conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE, - ConfiguredFailoverProxyProvider.class.getName()); - conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE); - + + List nns = new ArrayList(numNNs); + int port = basePort; + for (int i = 0; i < numNNs; i++) { + nns.add("127.0.0.1:" + port); + // increment by 2 each time to account for the http port in the config setting + port += 2; + } + + // use standard failover configurations + HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns); return conf; } public MiniDFSCluster getDfsCluster() { return cluster; } - + public MiniJournalCluster getJournalCluster() { return journalCluster; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index d5a9426..b203872 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -162,7 +162,7 @@ public class TestBlockToken { public void testWritable() throws Exception { TestWritable.testWritable(new BlockTokenIdentifier()); BlockTokenSecretManager sm = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null); TestWritable.testWritable(generateTokenId(sm, block1, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class))); TestWritable.testWritable(generateTokenId(sm, block2, @@ -201,7 +201,7 @@ public class TestBlockToken { @Test public void testBlockTokenSecretManager() throws Exception { BlockTokenSecretManager masterHandler = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null); BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager( blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null); ExportedBlockKeys keys = masterHandler.exportKeys(); @@ -244,7 +244,7 @@ public class TestBlockToken { UserGroupInformation.setConfiguration(conf); BlockTokenSecretManager sm = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)); @@ -283,7 +283,7 @@ public class TestBlockToken { Assume.assumeTrue(FD_DIR.exists()); BlockTokenSecretManager sm = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)); @@ -352,7 +352,7 @@ public class TestBlockToken { for (int i = 0; i < 10; i++) { String bpid = Integer.toString(i); BlockTokenSecretManager masterHandler = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null); BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager( blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null); bpMgr.addBlockPool(bpid, slaveHandler); http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java index f01be4b..0818571 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java @@ -319,7 +319,7 @@ public class TestBackupNode { if(fileSys != null) fileSys.close(); if(cluster != null) cluster.shutdown(); } - File nnCurDir = new File(BASE_DIR, "name1/current/"); + File nnCurDir = new File(MiniDFSCluster.getNameNodeDirectory(BASE_DIR, 0, 0)[0], "current/"); File bnCurDir = new File(getBackupNodeDir(op, 1), "/current/"); FSImageTestUtil.assertParallelFilesAreIdentical( http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java index 5a51cb7..7073726 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java @@ -1428,7 +1428,8 @@ public class TestCheckpoint { // secondary = startSecondaryNameNode(conf); - File secondaryDir = new File(MiniDFSCluster.getBaseDirectory(), "namesecondary1"); + File secondaryDir = MiniDFSCluster.getCheckpointDirectory(MiniDFSCluster.getBaseDirectory(), + 0, 0)[0]; File secondaryCurrent = new File(secondaryDir, "current"); long expectedTxIdToDownload = cluster.getNameNode().getFSImage() http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java index 5b72901..a736d27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java @@ -42,7 +42,8 @@ public class HAStressTestHarness { private MiniDFSCluster cluster; static final int BLOCK_SIZE = 1024; final TestContext testCtx = new TestContext(); - + private int nns = 2; + public HAStressTestHarness() { conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); @@ -55,11 +56,19 @@ public class HAStressTestHarness { } /** + * Set the number of namenodes that should be run. This must be set before calling + * {@link #startCluster()} + */ + public void setNumberOfNameNodes(int nns) { + this.nns = nns; + } + + /** * Start and return the MiniDFSCluster. */ public MiniDFSCluster startCluster() throws IOException { cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .nnTopology(MiniDFSNNTopology.simpleHATopology(nns)) .numDataNodes(3) .build(); return cluster; @@ -99,28 +108,27 @@ public class HAStressTestHarness { } /** - * Add a thread which periodically triggers failover back and forth between - * the two namenodes. + * Add a thread which periodically triggers failover back and forth between the namenodes. */ public void addFailoverThread(final int msBetweenFailovers) { testCtx.addThread(new RepeatingTestThread(testCtx) { - @Override public void doAnAction() throws Exception { - System.err.println("==============================\n" + - "Failing over from 0->1\n" + - "=================================="); - cluster.transitionToStandby(0); - cluster.transitionToActive(1); - - Thread.sleep(msBetweenFailovers); - System.err.println("==============================\n" + - "Failing over from 1->0\n" + - "=================================="); - - cluster.transitionToStandby(1); - cluster.transitionToActive(0); - Thread.sleep(msBetweenFailovers); + // fail over from one namenode to the next, all the way back to the original NN + for (int i = 0; i < nns; i++) { + // next node, mod nns so we wrap to the 0th NN on the last iteration + int next = (i + 1) % nns; + System.err.println("==============================\n" + + "[Starting] Failing over from " + i + "->" + next + "\n" + + "=============================="); + cluster.transitionToStandby(i); + cluster.transitionToActive(next); + System.err.println("==============================\n" + + "[Completed] Failing over from " + i + "->" + next + ". Sleeping for "+ + (msBetweenFailovers/1000) +"sec \n" + + "=============================="); + Thread.sleep(msBetweenFailovers); + } } }); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index c7c4a77..5543a2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -24,9 +24,14 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeoutException; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -67,12 +72,11 @@ public abstract class HATestUtil { */ public static void waitForStandbyToCatchUp(NameNode active, NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException { - long activeTxId = active.getNamesystem().getFSImage().getEditLog() .getLastWrittenTxId(); - + active.getRpcServer().rollEditLog(); - + long start = Time.now(); while (Time.now() - start < TestEditLogTailer.NN_LAG_TIMEOUT) { long nn2HighestTxId = standby.getNamesystem().getFSImage() @@ -166,34 +170,52 @@ public abstract class HATestUtil { /** Sets the required configurations for performing failover. */ public static void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName, int nsIndex) { - InetSocketAddress nnAddr1 = cluster.getNameNode(2 * nsIndex).getNameNodeAddress(); - InetSocketAddress nnAddr2 = cluster.getNameNode(2 * nsIndex + 1).getNameNodeAddress(); - setFailoverConfigurations(conf, logicalName, nnAddr1, nnAddr2); + MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex); + List nnAddresses = new ArrayList(3); + for (MiniDFSCluster.NameNodeInfo nn : nns) { + nnAddresses.add(nn.nameNode.getNameNodeAddress()); + } + setFailoverConfigurations(conf, logicalName, nnAddresses); + } + + public static void setFailoverConfigurations(Configuration conf, String logicalName, + InetSocketAddress ... nnAddresses){ + setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses)); } /** * Sets the required configurations for performing failover */ public static void setFailoverConfigurations(Configuration conf, - String logicalName, InetSocketAddress nnAddr1, - InetSocketAddress nnAddr2) { - String nameNodeId1 = "nn1"; - String nameNodeId2 = "nn2"; - String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort(); - String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort(); - conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, - logicalName, nameNodeId1), address1); - conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, - logicalName, nameNodeId2), address2); - + String logicalName, List nnAddresses) { + setFailoverConfigurations(conf, logicalName, + Iterables.transform(nnAddresses, new Function() { + + // transform the inet address to a simple string + @Override + public String apply(InetSocketAddress addr) { + return "hdfs://" + addr.getHostName() + ":" + addr.getPort(); + } + })); + } + + public static void setFailoverConfigurations(Configuration conf, String logicalName, + Iterable nnAddresses) { + List nnids = new ArrayList(); + int i = 0; + for (String address : nnAddresses) { + String nnId = "nn" + (i + 1); + nnids.add(nnId); + conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, logicalName, nnId), address); + i++; + } conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName); conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName), - nameNodeId1 + "," + nameNodeId2); + Joiner.on(',').join(nnids)); conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, ConfiguredFailoverProxyProvider.class.getName()); conf.set("fs.defaultFS", "hdfs://" + logicalName); } - public static String getLogicalHostname(MiniDFSCluster cluster) { return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java index 7abc502..16dc766 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java @@ -46,37 +46,47 @@ import com.google.common.collect.ImmutableList; public class TestBootstrapStandby { private static final Log LOG = LogFactory.getLog(TestBootstrapStandby.class); - + + private static final int maxNNCount = 3; + private static final int STARTING_PORT = 20000; + private MiniDFSCluster cluster; private NameNode nn0; - + @Before public void setupCluster() throws IOException { Configuration conf = new Configuration(); - MiniDFSNNTopology topology = new MiniDFSNNTopology() - .addNameservice(new MiniDFSNNTopology.NSConf("ns1") - .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(20001)) - .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(20002))); - + // duplicate code with MiniQJMHACluster#createDefaultTopology, but don't want to cross + // dependencies or munge too much code to support it all correctly + MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf("ns1"); + for (int i = 0; i < maxNNCount; i++) { + nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i).setHttpPort(STARTING_PORT + i + 1)); + } + + MiniDFSNNTopology topology = new MiniDFSNNTopology().addNameservice(nameservice); + cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(topology) - .numDataNodes(0) - .build(); + .nnTopology(topology) + .numDataNodes(0) + .build(); cluster.waitActive(); - + nn0 = cluster.getNameNode(0); cluster.transitionToActive(0); - cluster.shutdownNameNode(1); + // shutdown the other NNs + for (int i = 1; i < maxNNCount; i++) { + cluster.shutdownNameNode(i); + } } - + @After public void shutdownCluster() { if (cluster != null) { cluster.shutdown(); } } - + /** * Test for the base success case. The primary NN * hasn't made any checkpoints, and we copy the fsimage_0 @@ -85,30 +95,29 @@ public class TestBootstrapStandby { @Test public void testSuccessfulBaseCase() throws Exception { removeStandbyNameDirs(); - - try { - cluster.restartNameNode(1); - fail("Did not throw"); - } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains( - "storage directory does not exist or is not accessible", - ioe); + + // skip the first NN, its up + for (int index = 1; index < maxNNCount; index++) { + try { + cluster.restartNameNode(index); + fail("Did not throw"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "storage directory does not exist or is not accessible", ioe); + } + + int rc = BootstrapStandby.run(new String[] { "-nonInteractive" }, + cluster.getConfiguration(index)); + assertEquals(0, rc); + + // Should have copied over the namespace from the active + FSImageTestUtil.assertNNHasCheckpoints(cluster, index, ImmutableList.of(0)); } - - int rc = BootstrapStandby.run( - new String[]{"-nonInteractive"}, - cluster.getConfiguration(1)); - assertEquals(0, rc); - - // Should have copied over the namespace from the active - FSImageTestUtil.assertNNHasCheckpoints(cluster, 1, - ImmutableList.of(0)); - FSImageTestUtil.assertNNFilesMatch(cluster); - // We should now be able to start the standby successfully. - cluster.restartNameNode(1); + // We should now be able to start the standbys successfully. + restartNameNodesFromIndex(1); } - + /** * Test for downloading a checkpoint made at a later checkpoint * from the active. @@ -123,21 +132,21 @@ public class TestBootstrapStandby { NameNodeAdapter.saveNamespace(nn0); NameNodeAdapter.leaveSafeMode(nn0); long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0) - .getFSImage().getMostRecentCheckpointTxId(); + .getFSImage().getMostRecentCheckpointTxId(); assertEquals(6, expectedCheckpointTxId); - int rc = BootstrapStandby.run( - new String[]{"-force"}, - cluster.getConfiguration(1)); - assertEquals(0, rc); - - // Should have copied over the namespace from the active - FSImageTestUtil.assertNNHasCheckpoints(cluster, 1, - ImmutableList.of((int)expectedCheckpointTxId)); + for (int i = 1; i < maxNNCount; i++) { + assertEquals(0, forceBootstrap(i)); + + // Should have copied over the namespace from the active + LOG.info("Checking namenode: " + i); + FSImageTestUtil.assertNNHasCheckpoints(cluster, i, + ImmutableList.of((int) expectedCheckpointTxId)); + } FSImageTestUtil.assertNNFilesMatch(cluster); // We should now be able to start the standby successfully. - cluster.restartNameNode(1); + restartNameNodesFromIndex(1); } /** @@ -147,36 +156,40 @@ public class TestBootstrapStandby { @Test public void testSharedEditsMissingLogs() throws Exception { removeStandbyNameDirs(); - + CheckpointSignature sig = nn0.getRpcServer().rollEditLog(); assertEquals(3, sig.getCurSegmentTxId()); - + // Should have created edits_1-2 in shared edits dir - URI editsUri = cluster.getSharedEditsDir(0, 1); + URI editsUri = cluster.getSharedEditsDir(0, maxNNCount - 1); File editsDir = new File(editsUri); - File editsSegment = new File(new File(editsDir, "current"), + File currentDir = new File(editsDir, "current"); + File editsSegment = new File(currentDir, NNStorage.getFinalizedEditsFileName(1, 2)); GenericTestUtils.assertExists(editsSegment); + GenericTestUtils.assertExists(currentDir); // Delete the segment. assertTrue(editsSegment.delete()); - + // Trying to bootstrap standby should now fail since the edit // logs aren't available in the shared dir. LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( LogFactory.getLog(BootstrapStandby.class)); try { - int rc = BootstrapStandby.run( - new String[]{"-force"}, - cluster.getConfiguration(1)); - assertEquals(BootstrapStandby.ERR_CODE_LOGS_UNAVAILABLE, rc); + assertEquals(BootstrapStandby.ERR_CODE_LOGS_UNAVAILABLE, forceBootstrap(1)); } finally { logs.stopCapturing(); } GenericTestUtils.assertMatches(logs.getOutput(), "FATAL.*Unable to read transaction ids 1-3 from the configured shared"); } - + + /** + * Show that bootstrapping will fail on a given NameNode if its directories already exist. Its not + * run across all the NN because its testing the state local on each node. + * @throws Exception on unexpected failure + */ @Test public void testStandbyDirsAlreadyExist() throws Exception { // Should not pass since standby dirs exist, force not given @@ -186,12 +199,9 @@ public class TestBootstrapStandby { assertEquals(BootstrapStandby.ERR_CODE_ALREADY_FORMATTED, rc); // Should pass with -force - rc = BootstrapStandby.run( - new String[]{"-force"}, - cluster.getConfiguration(1)); - assertEquals(0, rc); + assertEquals(0, forceBootstrap(1)); } - + /** * Test that, even if the other node is not active, we are able * to bootstrap standby from it. @@ -199,18 +209,44 @@ public class TestBootstrapStandby { @Test(timeout=30000) public void testOtherNodeNotActive() throws Exception { cluster.transitionToStandby(0); - int rc = BootstrapStandby.run( - new String[]{"-force"}, - cluster.getConfiguration(1)); - assertEquals(0, rc); + assertSuccessfulBootstrapFromIndex(1); } private void removeStandbyNameDirs() { - for (URI u : cluster.getNameDirs(1)) { - assertTrue(u.getScheme().equals("file")); - File dir = new File(u.getPath()); - LOG.info("Removing standby dir " + dir); - assertTrue(FileUtil.fullyDelete(dir)); + for (int i = 1; i < maxNNCount; i++) { + for (URI u : cluster.getNameDirs(i)) { + assertTrue(u.getScheme().equals("file")); + File dir = new File(u.getPath()); + LOG.info("Removing standby dir " + dir); + assertTrue(FileUtil.fullyDelete(dir)); + } + } + } + + private void restartNameNodesFromIndex(int start) throws IOException { + for (int i = start; i < maxNNCount; i++) { + // We should now be able to start the standby successfully. + cluster.restartNameNode(i, false); + } + + cluster.waitClusterUp(); + cluster.waitActive(); + } + + /** + * Force boot strapping on a namenode + * @param i index of the namenode to attempt + * @return exit code + * @throws Exception on unexpected failure + */ + private int forceBootstrap(int i) throws Exception { + return BootstrapStandby.run(new String[] { "-force" }, + cluster.getConfiguration(i)); + } + + private void assertSuccessfulBootstrapFromIndex(int start) throws Exception { + for (int i = start; i < maxNNCount; i++) { + assertEquals(0, forceBootstrap(i)); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java index ca8f563..db9a2de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java @@ -52,7 +52,8 @@ public class TestBootstrapStandbyWithQJM { private MiniDFSCluster cluster; private MiniJournalCluster jCluster; - + private int nnCount = 3; + @Before public void setup() throws Exception { Configuration conf = new Configuration(); @@ -62,7 +63,8 @@ public class TestBootstrapStandbyWithQJM { CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); - MiniQJMHACluster miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).build(); + MiniQJMHACluster miniQjmHaCluster = + new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build(); cluster = miniQjmHaCluster.getDfsCluster(); jCluster = miniQjmHaCluster.getJournalCluster(); @@ -90,18 +92,7 @@ public class TestBootstrapStandbyWithQJM { public void testBootstrapStandbyWithStandbyNN() throws Exception { // make the first NN in standby state cluster.transitionToStandby(0); - Configuration confNN1 = cluster.getConfiguration(1); - - // shut down nn1 - cluster.shutdownNameNode(1); - - int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1); - assertEquals(0, rc); - - // Should have copied over the namespace from the standby - FSImageTestUtil.assertNNHasCheckpoints(cluster, 1, - ImmutableList.of(0)); - FSImageTestUtil.assertNNFilesMatch(cluster); + bootstrapStandbys(); } /** BootstrapStandby when the existing NN is active */ @@ -109,17 +100,23 @@ public class TestBootstrapStandbyWithQJM { public void testBootstrapStandbyWithActiveNN() throws Exception { // make the first NN in active state cluster.transitionToActive(0); - Configuration confNN1 = cluster.getConfiguration(1); - - // shut down nn1 - cluster.shutdownNameNode(1); - - int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1); - assertEquals(0, rc); - - // Should have copied over the namespace from the standby - FSImageTestUtil.assertNNHasCheckpoints(cluster, 1, - ImmutableList.of(0)); + bootstrapStandbys(); + } + + private void bootstrapStandbys() throws Exception { + // shutdown and bootstrap all the other nns, except the first (start 1, not 0) + for (int i = 1; i < nnCount; i++) { + Configuration otherNNConf = cluster.getConfiguration(i); + + // shut down other nn + cluster.shutdownNameNode(i); + + int rc = BootstrapStandby.run(new String[] { "-force" }, otherNNConf); + assertEquals(0, rc); + + // Should have copied over the namespace from the standby + FSImageTestUtil.assertNNHasCheckpoints(cluster, i, ImmutableList.of(0)); + } FSImageTestUtil.assertNNFilesMatch(cluster); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java index e7cba75..9164582 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java @@ -107,6 +107,7 @@ public class TestDNFencingWithReplication { @Test public void testFencingStress() throws Exception { HAStressTestHarness harness = new HAStressTestHarness(); + harness.setNumberOfNameNodes(3); harness.conf.setInt( DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); harness.conf.setInt( http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index 8c61c92..aea4f87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -113,7 +113,12 @@ public class TestEditLogTailer { public void testNN1TriggersLogRolls() throws Exception { testStandbyTriggersLogRolls(1); } - + + @Test + public void testNN2TriggersLogRolls() throws Exception { + testStandbyTriggersLogRolls(2); + } + private static void testStandbyTriggersLogRolls(int activeIndex) throws Exception { Configuration conf = new Configuration(); @@ -125,7 +130,8 @@ public class TestEditLogTailer { MiniDFSNNTopology topology = new MiniDFSNNTopology() .addNameservice(new MiniDFSNNTopology.NSConf("ns1") .addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10031)) - .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10032))); + .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10032)) + .addNN(new MiniDFSNNTopology.NNConf("nn3").setIpcPort(10033))); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .nnTopology(topology) @@ -145,7 +151,7 @@ public class TestEditLogTailer { private static void waitForLogRollInSharedDir(MiniDFSCluster cluster, long startTxId) throws Exception { - URI sharedUri = cluster.getSharedEditsDir(0, 1); + URI sharedUri = cluster.getSharedEditsDir(0, 2); File sharedDir = new File(sharedUri.getPath(), "current"); final File expectedLog = new File(sharedDir, NNStorage.getInProgressEditsFileName(startTxId)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java index 151e7d3..116079a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java @@ -56,10 +56,11 @@ public class TestFailoverWithBlockTokensEnabled { private static final Path TEST_PATH = new Path("/test-path"); private static final String TEST_DATA = "very important text"; - + private static final int numNNs = 3; + private Configuration conf; private MiniDFSCluster cluster; - + @Before public void startCluster() throws IOException { conf = new Configuration(); @@ -67,7 +68,7 @@ public class TestFailoverWithBlockTokensEnabled { // Set short retry timeouts so this test runs faster conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .nnTopology(MiniDFSNNTopology.simpleHATopology(numNNs)) .numDataNodes(1) .build(); } @@ -78,33 +79,41 @@ public class TestFailoverWithBlockTokensEnabled { cluster.shutdown(); } } - + @Test public void ensureSerialNumbersNeverOverlap() { BlockTokenSecretManager btsm1 = cluster.getNamesystem(0).getBlockManager() .getBlockTokenSecretManager(); BlockTokenSecretManager btsm2 = cluster.getNamesystem(1).getBlockManager() .getBlockTokenSecretManager(); - - btsm1.setSerialNo(0); - btsm2.setSerialNo(0); - assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); - - btsm1.setSerialNo(Integer.MAX_VALUE); - btsm2.setSerialNo(Integer.MAX_VALUE); - assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); - - btsm1.setSerialNo(Integer.MIN_VALUE); - btsm2.setSerialNo(Integer.MIN_VALUE); - assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); - - btsm1.setSerialNo(Integer.MAX_VALUE / 2); - btsm2.setSerialNo(Integer.MAX_VALUE / 2); - assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); + BlockTokenSecretManager btsm3 = cluster.getNamesystem(2).getBlockManager() + .getBlockTokenSecretManager(); + + setAndCheckSerialNumber(0, btsm1, btsm2, btsm3); + setAndCheckSerialNumber(Integer.MAX_VALUE, btsm1, btsm2, btsm3); + setAndCheckSerialNumber(Integer.MIN_VALUE, btsm1, btsm2, btsm3); + setAndCheckSerialNumber(Integer.MAX_VALUE / 2, btsm1, btsm2, btsm3); + setAndCheckSerialNumber(Integer.MIN_VALUE / 2, btsm1, btsm2, btsm3); + setAndCheckSerialNumber(Integer.MAX_VALUE / 3, btsm1, btsm2, btsm3); + setAndCheckSerialNumber(Integer.MIN_VALUE / 3, btsm1, btsm2, btsm3); + } + + private void setAndCheckSerialNumber(int serialNumber, BlockTokenSecretManager... btsms) { + for (BlockTokenSecretManager btsm : btsms) { + btsm.setSerialNo(serialNumber); + } - btsm1.setSerialNo(Integer.MIN_VALUE / 2); - btsm2.setSerialNo(Integer.MIN_VALUE / 2); - assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); + for (int i = 0; i < btsms.length; i++) { + for (int j = 0; j < btsms.length; j++) { + if (j == i) { + continue; + } + int first = btsms[i].getSerialNoForTesting(); + int second = btsms[j].getSerialNoForTesting(); + assertFalse("Overlap found for set serial number (" + serialNumber + ") is " + i + ": " + + first + " == " + j + ": " + second, first == second); + } + } } @Test