hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [14/21] hadoop git commit: HDFS-6440. Support more than 2 NameNodes. Contributed by Jesse Yates.
Date Wed, 24 Jun 2015 17:50:52 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index fdbacdc..0a21886 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -62,6 +62,8 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -445,7 +447,7 @@ public class MiniDFSCluster {
     final int numNameNodes = builder.nnTopology.countNameNodes();
     LOG.info("starting cluster: numNameNodes=" + numNameNodes
         + ", numDataNodes=" + builder.numDataNodes);
-    nameNodes = new NameNodeInfo[numNameNodes];
+
     this.storagesPerDatanode = builder.storagesPerDatanode;
 
     // Duplicate the storageType setting for each DN.
@@ -515,7 +517,7 @@ public class MiniDFSCluster {
   }
 
   private Configuration conf;
-  private NameNodeInfo[] nameNodes;
+  private Multimap<String, NameNodeInfo> namenodes = ArrayListMultimap.create();
   protected int numDataNodes;
   protected final List<DataNodeProperties> dataNodes =
                          new ArrayList<DataNodeProperties>();
@@ -539,10 +541,10 @@ public class MiniDFSCluster {
    * Stores the information related to a namenode in the cluster
    */
   public static class NameNodeInfo {
-    final NameNode nameNode;
-    final Configuration conf;
-    final String nameserviceId;
-    final String nnId;
+    public NameNode nameNode;
+    Configuration conf;
+    String nameserviceId;
+    String nnId;
     StartupOption startOpt;
     NameNodeInfo(NameNode nn, String nameserviceId, String nnId,
         StartupOption startOpt, Configuration conf) {
@@ -563,7 +565,6 @@ public class MiniDFSCluster {
    * without a name node (ie when the name node is started elsewhere).
    */
   public MiniDFSCluster() {
-    nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
     storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
     synchronized (MiniDFSCluster.class) {
       instanceId = instanceCount++;
@@ -740,7 +741,6 @@ public class MiniDFSCluster {
                         StartupOption operation,
                         String[] racks, String hosts[],
                         long[] simulatedCapacities) throws IOException {
-    this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
     this.storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
     initMiniDFSCluster(conf, numDataNodes, null, format,
                        manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
@@ -814,7 +814,7 @@ public class MiniDFSCluster {
         createNameNodesAndSetConf(
             nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs,
             enableManagedDfsDirsRedundancy,
-            format, startOpt, clusterId, conf);
+            format, startOpt, clusterId);
       } catch (IOException ioe) {
         LOG.error("IOE creating namenodes. Permissions dump:\n" +
             createPermissionsDiagnosisString(data_dir), ioe);
@@ -871,7 +871,127 @@ public class MiniDFSCluster {
   private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
       boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs,
       boolean enableManagedDfsDirsRedundancy, boolean format,
+      StartupOption operation, String clusterId) throws IOException {
+    // do the basic namenode configuration
+    configureNameNodes(nnTopology, federation, conf);
+
+    int nnCounter = 0;
+    int nsCounter = 0;
+    // configure each NS independently
+    for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
+      configureNameService(nameservice, nsCounter++, manageNameDfsSharedDirs,
+          manageNameDfsDirs, enableManagedDfsDirsRedundancy,
+          format, operation, clusterId, nnCounter);
+      nnCounter += nameservice.getNNs().size();
+    }
+  }
+
+  /**
+   * Do the rest of the NN configuration for things like shared edits,
+   * as well as directory formatting, etc. for a single nameservice
+   * @param nnCounter the count of the number of namenodes already configured/started. Also,
+   *                  acts as the <i>index</i> to the next NN to start (since indicies start at 0).
+   * @throws IOException
+   */
+  private void configureNameService(MiniDFSNNTopology.NSConf nameservice, int nsCounter,
+      boolean manageNameDfsSharedDirs, boolean manageNameDfsDirs, boolean
+      enableManagedDfsDirsRedundancy, boolean format,
       StartupOption operation, String clusterId,
+      final int nnCounter) throws IOException{
+    String nsId = nameservice.getId();
+    String lastDefaultFileSystem = null;
+
+    // If HA is enabled on this nameservice, enumerate all the namenodes
+    // in the configuration. Also need to set a shared edits dir
+    int numNNs = nameservice.getNNs().size();
+    if (numNNs > 1 && manageNameDfsSharedDirs) {
+      URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter + numNNs - 1);
+      conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
+      // Clean out the shared edits dir completely, including all subdirectories.
+      FileUtil.fullyDelete(new File(sharedEditsUri));
+    }
+
+    // Now format first NN and copy the storage directory from that node to the others.
+    int nnIndex = nnCounter;
+    Collection<URI> prevNNDirs = null;
+    for (NNConf nn : nameservice.getNNs()) {
+      initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs,
+          manageNameDfsDirs,  nnIndex);
+      Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
+      if (format) {
+        // delete the existing namespaces
+        for (URI nameDirUri : namespaceDirs) {
+          File nameDir = new File(nameDirUri);
+          if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) {
+            throw new IOException("Could not fully delete " + nameDir);
+          }
+        }
+
+        // delete the checkpoint directories, if they exist
+        Collection<URI> checkpointDirs = Util.stringCollectionAsURIs(conf
+            .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY));
+        for (URI checkpointDirUri : checkpointDirs) {
+          File checkpointDir = new File(checkpointDirUri);
+          if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) {
+            throw new IOException("Could not fully delete " + checkpointDir);
+          }
+        }
+      }
+
+      boolean formatThisOne = format;
+      // if we are looking at not the first NN
+      if (nnIndex++ > nnCounter && format) {
+        // Don't format the second, third, etc NN in an HA setup - that
+        // would result in it having a different clusterID,
+        // block pool ID, etc. Instead, copy the name dirs
+        // from the previous one.
+        formatThisOne = false;
+        assert (null != prevNNDirs);
+        copyNameDirs(prevNNDirs, namespaceDirs, conf);
+      }
+
+      if (formatThisOne) {
+        // Allow overriding clusterID for specific NNs to test
+        // misconfiguration.
+        if (nn.getClusterId() == null) {
+          StartupOption.FORMAT.setClusterId(clusterId);
+        } else {
+          StartupOption.FORMAT.setClusterId(nn.getClusterId());
+        }
+        DFSTestUtil.formatNameNode(conf);
+      }
+      prevNNDirs = namespaceDirs;
+    }
+
+    // create all the namenodes in the namespace
+    nnIndex = nnCounter;
+    for (NNConf nn : nameservice.getNNs()) {
+      initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs,
+          enableManagedDfsDirsRedundancy, nnIndex++);
+      NameNodeInfo info = createNameNode(conf, false, operation,
+          clusterId, nsId, nn.getNnId());
+
+      // Record the last namenode uri
+      if (info != null && info.conf != null) {
+        lastDefaultFileSystem =
+            info.conf.get(FS_DEFAULT_NAME_KEY);
+      }
+    }
+    if (!federation && lastDefaultFileSystem != null) {
+      // Set the default file system to the actual bind address of NN.
+      conf.set(FS_DEFAULT_NAME_KEY, lastDefaultFileSystem);
+    }
+  }
+
+  /**
+   * Do the basic NN configuration for the topology. Does not configure things like the shared
+   * edits directories
+   * @param nnTopology
+   * @param federation
+   * @param conf
+   * @throws IOException
+   */
+  public static void configureNameNodes(MiniDFSNNTopology nnTopology, boolean federation,
       Configuration conf) throws IOException {
     Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
         "empty NN topology: no namenodes specified!");
@@ -884,22 +1004,21 @@ public class MiniDFSCluster {
       // NN is started.
       conf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:" + onlyNN.getIpcPort());
     }
-    
+
     List<String> allNsIds = Lists.newArrayList();
     for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
       if (nameservice.getId() != null) {
         allNsIds.add(nameservice.getId());
       }
     }
+
     if (!allNsIds.isEmpty()) {
       conf.set(DFS_NAMESERVICES, Joiner.on(",").join(allNsIds));
     }
-    
-    int nnCounter = 0;
+
     for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
       String nsId = nameservice.getId();
-      String lastDefaultFileSystem = null;
-      
+
       Preconditions.checkArgument(
           !federation || nsId != null,
           "if there is more than one NS, they must have names");
@@ -918,85 +1037,10 @@ public class MiniDFSCluster {
       // If HA is enabled on this nameservice, enumerate all the namenodes
       // in the configuration. Also need to set a shared edits dir
       if (nnIds.size() > 1) {
-        conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()),
-            Joiner.on(",").join(nnIds));
-        if (manageNameDfsSharedDirs) {
-          URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1); 
-          conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
-          // Clean out the shared edits dir completely, including all subdirectories.
-          FileUtil.fullyDelete(new File(sharedEditsUri));
-        }
-      }
-
-      // Now format first NN and copy the storage directory from that node to the others.
-      int i = 0;
-      Collection<URI> prevNNDirs = null;
-      int nnCounterForFormat = nnCounter;
-      for (NNConf nn : nameservice.getNNs()) {
-        initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, manageNameDfsDirs,
-            nnCounterForFormat);
-        Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
-        if (format) {
-          for (URI nameDirUri : namespaceDirs) {
-            File nameDir = new File(nameDirUri);
-            if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) {
-              throw new IOException("Could not fully delete " + nameDir);
-            }
-          }
-          Collection<URI> checkpointDirs = Util.stringCollectionAsURIs(conf
-              .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY));
-          for (URI checkpointDirUri : checkpointDirs) {
-            File checkpointDir = new File(checkpointDirUri);
-            if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) {
-              throw new IOException("Could not fully delete " + checkpointDir);
-            }
-          }
-        }
-        
-        boolean formatThisOne = format;
-        if (format && i++ > 0) {
-          // Don't format the second NN in an HA setup - that
-          // would result in it having a different clusterID,
-          // block pool ID, etc. Instead, copy the name dirs
-          // from the first one.
-          formatThisOne = false;
-          assert (null != prevNNDirs);
-          copyNameDirs(prevNNDirs, namespaceDirs, conf);
-        }
-        
-        nnCounterForFormat++;
-        if (formatThisOne) {
-          // Allow overriding clusterID for specific NNs to test
-          // misconfiguration.
-          if (nn.getClusterId() == null) {
-            StartupOption.FORMAT.setClusterId(clusterId);
-          } else {
-            StartupOption.FORMAT.setClusterId(nn.getClusterId());
-          }
-          DFSTestUtil.formatNameNode(conf);
-        }
-        prevNNDirs = namespaceDirs;
-      }
-
-      // Start all Namenodes
-      for (NNConf nn : nameservice.getNNs()) {
-        initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs,
-            enableManagedDfsDirsRedundancy, nnCounter);
-        createNameNode(nnCounter, conf, numDataNodes, false, operation,
-            clusterId, nsId, nn.getNnId());
-        // Record the last namenode uri
-        if (nameNodes[nnCounter] != null && nameNodes[nnCounter].conf != null) {
-          lastDefaultFileSystem =
-              nameNodes[nnCounter].conf.get(FS_DEFAULT_NAME_KEY);
-        }
-        nnCounter++;
-      }
-      if (!federation && lastDefaultFileSystem != null) {
-        // Set the default file system to the actual bind address of NN.
-        conf.set(FS_DEFAULT_NAME_KEY, lastDefaultFileSystem);
+        conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()), Joiner
+            .on(",").join(nnIds));
       }
     }
-
   }
   
   public URI getSharedEditsDir(int minNN, int maxNN) throws IOException {
@@ -1010,39 +1054,92 @@ public class MiniDFSCluster {
   }
   
   public NameNodeInfo[] getNameNodeInfos() {
-    return this.nameNodes;
+    return this.namenodes.values().toArray(new NameNodeInfo[0]);
   }
 
-  private void initNameNodeConf(Configuration conf,
-      String nameserviceId, String nnId,
-      boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy,
-      int nnIndex) throws IOException {
+  /**
+   * @param nsIndex index of the namespace id to check
+   * @return all the namenodes bound to the given namespace index
+   */
+  public NameNodeInfo[] getNameNodeInfos(int nsIndex) {
+    int i = 0;
+    for (String ns : this.namenodes.keys()) {
+      if (i++ == nsIndex) {
+        return this.namenodes.get(ns).toArray(new NameNodeInfo[0]);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * @param nameservice id of nameservice to read
+   * @return all the namenodes bound to the given namespace index
+   */
+  public NameNodeInfo[] getNameNodeInfos(String nameservice) {
+    for (String ns : this.namenodes.keys()) {
+      if (nameservice.equals(ns)) {
+        return this.namenodes.get(ns).toArray(new NameNodeInfo[0]);
+      }
+    }
+    return null;
+  }
+
+
+  private void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId,
+      boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, int nnIndex)
+      throws IOException {
     if (nameserviceId != null) {
       conf.set(DFS_NAMESERVICE_ID, nameserviceId);
     }
     if (nnId != null) {
       conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
     }
-    
     if (manageNameDfsDirs) {
       if (enableManagedDfsDirsRedundancy) {
-        conf.set(DFS_NAMENODE_NAME_DIR_KEY,
-            fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
-            fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
-        conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
-            fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
-            fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
+        File[] files = getNameNodeDirectory(nsIndex, nnIndex);
+        conf.set(DFS_NAMENODE_NAME_DIR_KEY, fileAsURI(files[0]) + "," + fileAsURI(files[1]));
+        files = getCheckpointDirectory(nsIndex, nnIndex);
+        conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, fileAsURI(files[0]) + "," + fileAsURI(files[1]));
       } else {
-        conf.set(DFS_NAMENODE_NAME_DIR_KEY,
-            fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1))).
-              toString());
-        conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
-            fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1))).
-              toString());
+        File[] files = getNameNodeDirectory(nsIndex, nnIndex);
+        conf.set(DFS_NAMENODE_NAME_DIR_KEY, fileAsURI(files[0]).toString());
+        files = getCheckpointDirectory(nsIndex, nnIndex);
+        conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, fileAsURI(files[0]).toString());
       }
     }
   }
 
+  private File[] getNameNodeDirectory(int nameserviceIndex, int nnIndex) {
+    return getNameNodeDirectory(base_dir, nameserviceIndex, nnIndex);
+  }
+
+  public static File[] getNameNodeDirectory(String base_dir, int nsIndex, int nnIndex) {
+    return getNameNodeDirectory(new File(base_dir), nsIndex, nnIndex);
+  }
+
+  public static File[] getNameNodeDirectory(File base_dir, int nsIndex, int nnIndex) {
+    File[] files = new File[2];
+    files[0] = new File(base_dir, "name-" + nsIndex + "-" + (2 * nnIndex + 1));
+    files[1] = new File(base_dir, "name-" + nsIndex + "-" + (2 * nnIndex + 2));
+    return files;
+  }
+
+  public File[] getCheckpointDirectory(int nsIndex, int nnIndex) {
+    return getCheckpointDirectory(base_dir, nsIndex, nnIndex);
+  }
+
+  public static File[] getCheckpointDirectory(String base_dir, int nsIndex, int nnIndex) {
+    return getCheckpointDirectory(new File(base_dir), nsIndex, nnIndex);
+  }
+
+  public static File[] getCheckpointDirectory(File base_dir, int nsIndex, int nnIndex) {
+    File[] files = new File[2];
+    files[0] = new File(base_dir, "namesecondary-" + nsIndex + "-" + (2 * nnIndex + 1));
+    files[1] = new File(base_dir, "namesecondary-" + nsIndex + "-" + (2 * nnIndex + 2));
+    return files;
+  }
+
+
   public static void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs,
       Configuration dstConf) throws IOException {
     URI srcDir = Lists.newArrayList(srcDirs).get(0);
@@ -1094,12 +1191,9 @@ public class MiniDFSCluster {
             new String[] {} : new String[] {operation.getName()};
     return args;
   }
-  
-  private void createNameNode(int nnIndex, Configuration conf,
-      int numDataNodes, boolean format, StartupOption operation,
-      String clusterId, String nameserviceId,
-      String nnId)
-      throws IOException {
+
+  private NameNodeInfo createNameNode(Configuration conf, boolean format, StartupOption operation,
+      String clusterId, String nameserviceId, String nnId) throws IOException {
     // Format and clean out DataNode directories
     if (format) {
       DFSTestUtil.formatNameNode(conf);
@@ -1113,7 +1207,7 @@ public class MiniDFSCluster {
     String[] args = createArgs(operation);
     NameNode nn =  NameNode.createNameNode(args, conf);
     if (operation == StartupOption.RECOVER) {
-      return;
+      return null;
     }
     
     // After the NN has started, set back the bound ports into
@@ -1131,14 +1225,17 @@ public class MiniDFSCluster {
 
     DFSUtil.setGenericConf(conf, nameserviceId, nnId,
         DFS_NAMENODE_HTTP_ADDRESS_KEY);
-    nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId,
+    NameNodeInfo info = new NameNodeInfo(nn, nameserviceId, nnId,
         operation, new Configuration(conf));
+    namenodes.put(nameserviceId, info);
+
     // Restore the default fs name
     if (originalDefaultFs == null) {
       conf.set(FS_DEFAULT_NAME_KEY, "");
     } else {
       conf.set(FS_DEFAULT_NAME_KEY, originalDefaultFs);
     }
+    return info;
   }
 
   /**
@@ -1154,7 +1251,7 @@ public class MiniDFSCluster {
    */
   public URI getURI(int nnIndex) {
     String hostPort =
-        nameNodes[nnIndex].nameNode.getNameNodeAddressHostPortString();
+        getNN(nnIndex).nameNode.getNameNodeAddressHostPortString();
     URI uri = null;
     try {
       uri = new URI("hdfs://" + hostPort);
@@ -1172,9 +1269,21 @@ public class MiniDFSCluster {
    * @return Configuration of for the given namenode
    */
   public Configuration getConfiguration(int nnIndex) {
-    return nameNodes[nnIndex].conf;
+    return getNN(nnIndex).conf;
   }
 
+  private NameNodeInfo getNN(int nnIndex) {
+    int count = 0;
+    for (NameNodeInfo nn : namenodes.values()) {
+      if (count == nnIndex) {
+        return nn;
+      }
+      count++;
+    }
+    return null;
+  }
+
+
   /**
    * wait for the given namenode to get out of safemode.
    */
@@ -1593,7 +1702,7 @@ public class MiniDFSCluster {
    * @throws Exception
    */
   public void finalizeCluster(int nnIndex, Configuration conf) throws Exception {
-    finalizeNamenode(nameNodes[nnIndex].nameNode, nameNodes[nnIndex].conf);
+    finalizeNamenode(getNN(nnIndex).nameNode, getNN(nnIndex).conf);
   }
 
   /**
@@ -1604,7 +1713,7 @@ public class MiniDFSCluster {
    * @throws IllegalStateException if the Namenode is not running.
    */
   public void finalizeCluster(Configuration conf) throws Exception {
-    for (NameNodeInfo nnInfo : nameNodes) {
+    for (NameNodeInfo nnInfo : namenodes.values()) {
       if (nnInfo == null) {
         throw new IllegalStateException("Attempting to finalize "
             + "Namenode but it is not running");
@@ -1612,9 +1721,9 @@ public class MiniDFSCluster {
       finalizeNamenode(nnInfo.nameNode, nnInfo.conf);
     }
   }
-  
+
   public int getNumNameNodes() {
-    return nameNodes.length;
+    return namenodes.size();
   }
   
   /**
@@ -1644,7 +1753,7 @@ public class MiniDFSCluster {
    * Gets the NameNode for the index.  May be null.
    */
   public NameNode getNameNode(int nnIndex) {
-    return nameNodes[nnIndex].nameNode;
+    return getNN(nnIndex).nameNode;
   }
   
   /**
@@ -1653,11 +1762,11 @@ public class MiniDFSCluster {
    */
   public FSNamesystem getNamesystem() {
     checkSingleNameNode();
-    return NameNodeAdapter.getNamesystem(nameNodes[0].nameNode);
+    return NameNodeAdapter.getNamesystem(getNN(0).nameNode);
   }
-  
+
   public FSNamesystem getNamesystem(int nnIndex) {
-    return NameNodeAdapter.getNamesystem(nameNodes[nnIndex].nameNode);
+    return NameNodeAdapter.getNamesystem(getNN(nnIndex).nameNode);
   }
 
   /**
@@ -1697,14 +1806,14 @@ public class MiniDFSCluster {
    * caller supplied port is not necessarily the actual port used.
    */     
   public int getNameNodePort(int nnIndex) {
-    return nameNodes[nnIndex].nameNode.getNameNodeAddress().getPort();
+    return getNN(nnIndex).nameNode.getNameNodeAddress().getPort();
   }
 
   /**
    * @return the service rpc port used by the NameNode at the given index.
    */     
   public int getNameNodeServicePort(int nnIndex) {
-    return nameNodes[nnIndex].nameNode.getServiceRpcAddress().getPort();
+    return getNN(nnIndex).nameNode.getServiceRpcAddress().getPort();
   }
     
   /**
@@ -1745,7 +1854,7 @@ public class MiniDFSCluster {
       fileSystems.clear();
     }
     shutdownDataNodes();
-    for (NameNodeInfo nnInfo : nameNodes) {
+    for (NameNodeInfo nnInfo : namenodes.values()) {
       if (nnInfo == null) continue;
       NameNode nameNode = nnInfo.nameNode;
       if (nameNode != null) {
@@ -1781,7 +1890,7 @@ public class MiniDFSCluster {
    * Shutdown all the namenodes.
    */
   public synchronized void shutdownNameNodes() {
-    for (int i = 0; i < nameNodes.length; i++) {
+    for (int i = 0; i < namenodes.size(); i++) {
       shutdownNameNode(i);
     }
   }
@@ -1790,13 +1899,15 @@ public class MiniDFSCluster {
    * Shutdown the namenode at a given index.
    */
   public synchronized void shutdownNameNode(int nnIndex) {
-    NameNode nn = nameNodes[nnIndex].nameNode;
+    NameNodeInfo info = getNN(nnIndex);
+    NameNode nn = info.nameNode;
     if (nn != null) {
       LOG.info("Shutting down the namenode");
       nn.stop();
       nn.join();
-      Configuration conf = nameNodes[nnIndex].conf;
-      nameNodes[nnIndex] = new NameNodeInfo(null, null, null, null, conf);
+      info.nnId = null;
+      info.nameNode = null;
+      info.nameserviceId = null;
     }
   }
   
@@ -1804,7 +1915,7 @@ public class MiniDFSCluster {
    * Restart all namenodes.
    */
   public synchronized void restartNameNodes() throws IOException {
-    for (int i = 0; i < nameNodes.length; i++) {
+    for (int i = 0; i < namenodes.size(); i++) {
       restartNameNode(i, false);
     }
     waitActive();
@@ -1840,19 +1951,19 @@ public class MiniDFSCluster {
    */
   public synchronized void restartNameNode(int nnIndex, boolean waitActive,
       String... args) throws IOException {
-    String nameserviceId = nameNodes[nnIndex].nameserviceId;
-    String nnId = nameNodes[nnIndex].nnId;
-    StartupOption startOpt = nameNodes[nnIndex].startOpt;
-    Configuration conf = nameNodes[nnIndex].conf;
+    NameNodeInfo info = getNN(nnIndex);
+    StartupOption startOpt = info.startOpt;
+
     shutdownNameNode(nnIndex);
     if (args.length != 0) {
       startOpt = null;
     } else {
       args = createArgs(startOpt);
     }
-    NameNode nn = NameNode.createNameNode(args, conf);
-    nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, startOpt,
-        conf);
+
+    NameNode nn = NameNode.createNameNode(args, info.conf);
+    info.nameNode = nn;
+    info.setStartOpt(startOpt);
     if (waitActive) {
       waitClusterUp();
       LOG.info("Restarted the namenode");
@@ -2124,7 +2235,7 @@ public class MiniDFSCluster {
    * or if waiting for safe mode is disabled.
    */
   public boolean isNameNodeUp(int nnIndex) {
-    NameNode nameNode = nameNodes[nnIndex].nameNode;
+    NameNode nameNode = getNN(nnIndex).nameNode;
     if (nameNode == null) {
       return false;
     }
@@ -2142,7 +2253,7 @@ public class MiniDFSCluster {
    * Returns true if all the NameNodes are running and is out of Safe Mode.
    */
   public boolean isClusterUp() {
-    for (int index = 0; index < nameNodes.length; index++) {
+    for (int index = 0; index < namenodes.size(); index++) {
       if (!isNameNodeUp(index)) {
         return false;
       }
@@ -2172,15 +2283,13 @@ public class MiniDFSCluster {
     checkSingleNameNode();
     return getFileSystem(0);
   }
-  
+
   /**
    * Get a client handle to the DFS cluster for the namenode at given index.
    */
   public DistributedFileSystem getFileSystem(int nnIndex) throws IOException {
-    DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(
-        getURI(nnIndex), nameNodes[nnIndex].conf);
-    fileSystems.add(dfs);
-    return dfs;
+    return (DistributedFileSystem) addFileSystem(FileSystem.get(getURI(nnIndex),
+        getNN(nnIndex).conf));
   }
 
   /**
@@ -2188,17 +2297,20 @@ public class MiniDFSCluster {
    * This simulating different threads working on different FileSystem instances.
    */
   public FileSystem getNewFileSystemInstance(int nnIndex) throws IOException {
-    FileSystem dfs = FileSystem.newInstance(getURI(nnIndex), nameNodes[nnIndex].conf);
-    fileSystems.add(dfs);
-    return dfs;
+    return addFileSystem(FileSystem.newInstance(getURI(nnIndex), getNN(nnIndex).conf));
   }
-  
+
+  private <T extends FileSystem> T addFileSystem(T fs) {
+    fileSystems.add(fs);
+    return fs;
+  }
+
   /**
    * @return a http URL
    */
   public String getHttpUri(int nnIndex) {
     return "http://"
-        + nameNodes[nnIndex].conf
+        + getNN(nnIndex).conf
             .get(DFS_NAMENODE_HTTP_ADDRESS_KEY);
   }
 
@@ -2206,14 +2318,14 @@ public class MiniDFSCluster {
    * Get the directories where the namenode stores its image.
    */
   public Collection<URI> getNameDirs(int nnIndex) {
-    return FSNamesystem.getNamespaceDirs(nameNodes[nnIndex].conf);
+    return FSNamesystem.getNamespaceDirs(getNN(nnIndex).conf);
   }
 
   /**
    * Get the directories where the namenode stores its edits.
    */
   public Collection<URI> getNameEditsDirs(int nnIndex) throws IOException {
-    return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
+    return FSNamesystem.getNamespaceEditsDirs(getNN(nnIndex).conf);
   }
   
   public void transitionToActive(int nnIndex) throws IOException,
@@ -2254,11 +2366,12 @@ public class MiniDFSCluster {
 
   /** Wait until the given namenode gets registration from all the datanodes */
   public void waitActive(int nnIndex) throws IOException {
-    if (nameNodes.length == 0 || nameNodes[nnIndex] == null
-        || nameNodes[nnIndex].nameNode == null) {
+    if (namenodes.size() == 0 || getNN(nnIndex) == null || getNN(nnIndex).nameNode == null) {
       return;
     }
-    InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
+
+    NameNodeInfo info = getNN(nnIndex);
+    InetSocketAddress addr = info.nameNode.getServiceRpcAddress();
     assert addr.getPort() != 0;
     DFSClient client = new DFSClient(addr, conf);
 
@@ -2278,7 +2391,7 @@ public class MiniDFSCluster {
    * Wait until the cluster is active and running.
    */
   public void waitActive() throws IOException {
-    for (int index = 0; index < nameNodes.length; index++) {
+    for (int index = 0; index < namenodes.size(); index++) {
       int failedCount = 0;
       while (true) {
         try {
@@ -2298,7 +2411,14 @@ public class MiniDFSCluster {
     }
     LOG.info("Cluster is active");
   }
-  
+
+  public void printNNs() {
+    for (int i = 0; i < namenodes.size(); i++) {
+      LOG.info("Have namenode " + i + ", info:" + getNN(i));
+      LOG.info(" has namenode: " + getNN(i).nameNode);
+    }
+  }
+
   private synchronized boolean shouldWait(DatanodeInfo[] dnInfo,
       InetSocketAddress addr) {
     // If a datanode failed to start, then do not wait
@@ -2696,7 +2816,7 @@ public class MiniDFSCluster {
    * namenode
    */
   private void checkSingleNameNode() {
-    if (nameNodes.length != 1) {
+    if (namenodes.size() != 1) {
       throw new IllegalArgumentException("Namenode index is needed");
     }
   }
@@ -2712,13 +2832,9 @@ public class MiniDFSCluster {
     if(!federation)
       throw new IOException("cannot add namenode to non-federated cluster");
 
-    int nnIndex = nameNodes.length;
-    int numNameNodes = nameNodes.length + 1;
-    NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes];
-    System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length);
-    nameNodes = newlist;
-    String nameserviceId = NAMESERVICE_ID_PREFIX + (nnIndex + 1);
-    
+    int nameServiceIndex = namenodes.keys().size();
+    String nameserviceId = NAMESERVICE_ID_PREFIX + (namenodes.keys().size() + 1);
+
     String nameserviceIds = conf.get(DFS_NAMESERVICES);
     nameserviceIds += "," + nameserviceId;
     conf.set(DFS_NAMESERVICES, nameserviceIds);
@@ -2726,9 +2842,11 @@ public class MiniDFSCluster {
     String nnId = null;
     initNameNodeAddress(conf, nameserviceId,
         new NNConf(nnId).setIpcPort(namenodePort));
-    initNameNodeConf(conf, nameserviceId, nnId, true, true, nnIndex);
-    createNameNode(nnIndex, conf, numDataNodes, true, null, null,
-        nameserviceId, nnId);
+    // figure out the current number of NNs
+    NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId);
+    int nnIndex = infos == null ? 0 : infos.length;
+    initNameNodeConf(conf, nameserviceId, nameServiceIndex, nnId, true, true, nnIndex);
+    NameNodeInfo info = createNameNode(conf, true, null, null, nameserviceId, nnId);
 
     // Refresh datanodes with the newly started namenode
     for (DataNodeProperties dn : dataNodes) {
@@ -2738,7 +2856,7 @@ public class MiniDFSCluster {
 
     // Wait for new namenode to get registrations from all the datanodes
     waitActive(nnIndex);
-    return nameNodes[nnIndex].nameNode;
+    return info.nameNode;
   }
   
   protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
index a99e9c3..b9786a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
@@ -56,10 +56,20 @@ public class MiniDFSNNTopology {
    * Set up an HA topology with a single HA nameservice.
    */
   public static MiniDFSNNTopology simpleHATopology() {
-    return new MiniDFSNNTopology()
-      .addNameservice(new MiniDFSNNTopology.NSConf("minidfs-ns")
-        .addNN(new MiniDFSNNTopology.NNConf("nn1"))
-        .addNN(new MiniDFSNNTopology.NNConf("nn2")));
+    return simpleHATopology(2);
+  }
+
+  /**
+   * Set up an HA topology with a single HA nameservice.
+   * @param nnCount of namenodes to use with the nameservice
+   */
+  public static MiniDFSNNTopology simpleHATopology(int nnCount) {
+    MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf("minidfs-ns");
+    for (int i = 1; i <= nnCount; i++) {
+      nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i));
+    }
+    MiniDFSNNTopology topology = new MiniDFSNNTopology().addNameservice(nameservice);
+    return topology;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
index ad907f6..fae1024 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
@@ -303,12 +303,12 @@ public class TestDFSUpgradeFromImage {
     unpackStorage(HADOOP22_IMAGE, HADOOP_DFS_DIR_TXT);
     
     // Overwrite the md5 stored in the VERSION files
-    File baseDir = new File(MiniDFSCluster.getBaseDirectory());
+    File[] nnDirs = MiniDFSCluster.getNameNodeDirectory(MiniDFSCluster.getBaseDirectory(), 0, 0);
     FSImageTestUtil.corruptVersionFile(
-        new File(baseDir, "name1/current/VERSION"),
+        new File(nnDirs[0], "current/VERSION"),
         "imageMD5Digest", "22222222222222222222222222222222");
     FSImageTestUtil.corruptVersionFile(
-        new File(baseDir, "name2/current/VERSION"),
+        new File(nnDirs[1], "current/VERSION"),
         "imageMD5Digest", "22222222222222222222222222222222");
     
     // Attach our own log appender so we can verify output

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
index c4c890f..b50b1cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
 
 /**
  * This class tests rolling upgrade.
@@ -66,7 +67,7 @@ public class TestRollingUpgrade {
    */
   @Test
   public void testDFSAdminRollingUpgradeCommands() throws Exception {
-    // start a cluster 
+    // start a cluster
     final Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
     try {
@@ -97,7 +98,7 @@ public class TestRollingUpgrade {
         runCmd(dfsadmin, true, "-rollingUpgrade", "query");
 
         dfs.mkdirs(bar);
-        
+
         //finalize rolling upgrade
         runCmd(dfsadmin, true, "-rollingUpgrade", "finalize");
 
@@ -143,7 +144,7 @@ public class TestRollingUpgrade {
     String nnDirPrefix = MiniDFSCluster.getBaseDirectory() + "/nn/";
     final File nn1Dir = new File(nnDirPrefix + "image1");
     final File nn2Dir = new File(nnDirPrefix + "image2");
-    
+
     LOG.info("nn1Dir=" + nn1Dir);
     LOG.info("nn2Dir=" + nn2Dir);
 
@@ -186,9 +187,9 @@ public class TestRollingUpgrade {
 
       final RollingUpgradeInfo info1;
       {
-        final DistributedFileSystem dfs = cluster.getFileSystem(); 
+        final DistributedFileSystem dfs = cluster.getFileSystem();
         dfs.mkdirs(foo);
-  
+
         //start rolling upgrade
         dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
         info1 = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
@@ -197,7 +198,7 @@ public class TestRollingUpgrade {
 
         //query rolling upgrade
         Assert.assertEquals(info1, dfs.rollingUpgrade(RollingUpgradeAction.QUERY));
-  
+
         dfs.mkdirs(bar);
         cluster.shutdown();
       }
@@ -209,8 +210,8 @@ public class TestRollingUpgrade {
         .format(false)
         .manageNameDfsDirs(false)
         .build();
-      final DistributedFileSystem dfs2 = cluster2.getFileSystem(); 
-      
+      final DistributedFileSystem dfs2 = cluster2.getFileSystem();
+
       // Check that cluster2 sees the edits made on cluster1
       Assert.assertTrue(dfs2.exists(foo));
       Assert.assertTrue(dfs2.exists(bar));
@@ -260,7 +261,7 @@ public class TestRollingUpgrade {
 
   @Test
   public void testRollback() throws IOException {
-    // start a cluster 
+    // start a cluster
     final Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
     try {
@@ -305,7 +306,7 @@ public class TestRollingUpgrade {
       if(cluster != null) cluster.shutdown();
     }
   }
-  
+
   private static void startRollingUpgrade(Path foo, Path bar,
       Path file, byte[] data,
       MiniDFSCluster cluster) throws IOException {
@@ -327,7 +328,7 @@ public class TestRollingUpgrade {
     TestFileTruncate.checkBlockRecovery(file, dfs);
     AppendTestUtil.checkFullFile(dfs, file, newLength, data);
   }
-  
+
   private static void rollbackRollingUpgrade(Path foo, Path bar,
       Path file, byte[] data,
       MiniDFSCluster cluster) throws IOException {
@@ -372,22 +373,33 @@ public class TestRollingUpgrade {
     }
   }
 
-  @Test (timeout = 300000)
+  @Test(timeout = 300000)
   public void testFinalize() throws Exception {
+    testFinalize(2);
+  }
+
+  @Test(timeout = 300000)
+  public void testFinalizeWithMultipleNN() throws Exception {
+    testFinalize(3);
+  }
+
+  private void testFinalize(int nnCount) throws Exception {
     final Configuration conf = new HdfsConfiguration();
     MiniQJMHACluster cluster = null;
     final Path foo = new Path("/foo");
     final Path bar = new Path("/bar");
 
     try {
-      cluster = new MiniQJMHACluster.Builder(conf).build();
+      cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
       MiniDFSCluster dfsCluster = cluster.getDfsCluster();
       dfsCluster.waitActive();
 
-      // let NN1 tail editlog every 1s
-      dfsCluster.getConfiguration(1).setInt(
-          DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
-      dfsCluster.restartNameNode(1);
+      // let other NN tail editlog every 1s
+      for(int i=1; i < nnCount; i++) {
+        dfsCluster.getConfiguration(i).setInt(
+            DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+      }
+      dfsCluster.restartNameNodes();
 
       dfsCluster.transitionToActive(0);
       DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
@@ -425,17 +437,29 @@ public class TestRollingUpgrade {
 
   @Test (timeout = 300000)
   public void testQuery() throws Exception {
+    testQuery(2);
+  }
+
+  @Test (timeout = 300000)
+  public void testQueryWithMultipleNN() throws Exception {
+    testQuery(3);
+  }
+
+  private void testQuery(int nnCount) throws Exception{
     final Configuration conf = new Configuration();
     MiniQJMHACluster cluster = null;
     try {
-      cluster = new MiniQJMHACluster.Builder(conf).build();
+      cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
       MiniDFSCluster dfsCluster = cluster.getDfsCluster();
       dfsCluster.waitActive();
 
       dfsCluster.transitionToActive(0);
       DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
 
-      dfsCluster.shutdownNameNode(1);
+      // shutdown other NNs
+      for (int i = 1; i < nnCount; i++) {
+        dfsCluster.shutdownNameNode(i);
+      }
 
       // start rolling upgrade
       RollingUpgradeInfo info = dfs
@@ -445,13 +469,16 @@ public class TestRollingUpgrade {
       info = dfs.rollingUpgrade(RollingUpgradeAction.QUERY);
       Assert.assertFalse(info.createdRollbackImages());
 
-      dfsCluster.restartNameNode(1);
-
+      // restart other NNs
+      for (int i = 1; i < nnCount; i++) {
+        dfsCluster.restartNameNode(i);
+      }
+      // check that one of the other NNs has created the rollback image and uploaded it
       queryForPreparation(dfs);
 
       // The NN should have a copy of the fsimage in case of rollbacks.
       Assert.assertTrue(dfsCluster.getNamesystem(0).getFSImage()
-          .hasRollbackFSImage());
+              .hasRollbackFSImage());
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -487,6 +514,15 @@ public class TestRollingUpgrade {
 
   @Test(timeout = 300000)
   public void testCheckpoint() throws IOException, InterruptedException {
+    testCheckpoint(2);
+  }
+
+  @Test(timeout = 300000)
+  public void testCheckpointWithMultipleNN() throws IOException, InterruptedException {
+    testCheckpoint(3);
+  }
+
+  public void testCheckpoint(int nnCount) throws IOException, InterruptedException {
     final Configuration conf = new Configuration();
     conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1);
@@ -495,7 +531,7 @@ public class TestRollingUpgrade {
     final Path foo = new Path("/foo");
 
     try {
-      cluster = new MiniQJMHACluster.Builder(conf).build();
+      cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
       MiniDFSCluster dfsCluster = cluster.getDfsCluster();
       dfsCluster.waitActive();
 
@@ -513,16 +549,9 @@ public class TestRollingUpgrade {
       long txid = dfs.rollEdits();
       Assert.assertTrue(txid > 0);
 
-      int retries = 0;
-      while (++retries < 5) {
-        NNStorage storage = dfsCluster.getNamesystem(1).getFSImage()
-            .getStorage();
-        if (storage.getFsImageName(txid - 1) != null) {
-          return;
-        }
-        Thread.sleep(1000);
+      for(int i=1; i< nnCount; i++) {
+        verifyNNCheckpoint(dfsCluster, txid, i);
       }
-      Assert.fail("new checkpoint does not exist");
 
     } finally {
       if (cluster != null) {
@@ -531,6 +560,22 @@ public class TestRollingUpgrade {
     }
   }
 
+  /**
+   * Verify that the namenode at the given index has an FSImage with a TxId up to txid-1
+   */
+  private void verifyNNCheckpoint(MiniDFSCluster dfsCluster, long txid, int nnIndex) throws InterruptedException {
+    int retries = 0;
+    while (++retries < 5) {
+      NNStorage storage = dfsCluster.getNamesystem(nnIndex).getFSImage()
+              .getStorage();
+      if (storage.getFsImageName(txid - 1) != null) {
+        return;
+      }
+      Thread.sleep(1000);
+    }
+    Assert.fail("new checkpoint does not exist");
+  }
+
   static void queryForPreparation(DistributedFileSystem dfs) throws IOException,
       InterruptedException {
     RollingUpgradeInfo info;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
index ef4c559..470a08b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
@@ -17,43 +17,39 @@
  */
 package org.apache.hadoop.hdfs.qjournal;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
-
-import java.io.IOException;
-import java.net.BindException;
-import java.net.URI;
-import java.util.Random;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 
 public class MiniQJMHACluster {
   private MiniDFSCluster cluster;
   private MiniJournalCluster journalCluster;
   private final Configuration conf;
   private static final Log LOG = LogFactory.getLog(MiniQJMHACluster.class);
-  
+
   public static final String NAMESERVICE = "ns1";
-  private static final String NN1 = "nn1";
-  private static final String NN2 = "nn2";
   private static final Random RANDOM = new Random();
   private int basePort = 10000;
 
   public static class Builder {
     private final Configuration conf;
     private StartupOption startOpt = null;
+    private int numNNs = 2;
     private final MiniDFSCluster.Builder dfsBuilder;
-    
+
     public Builder(Configuration conf) {
       this.conf = conf;
       // most QJMHACluster tests don't need DataNodes, so we'll make
@@ -64,7 +60,7 @@ public class MiniQJMHACluster {
     public MiniDFSCluster.Builder getDfsBuilder() {
       return dfsBuilder;
     }
-    
+
     public MiniQJMHACluster build() throws IOException {
       return new MiniQJMHACluster(this);
     }
@@ -72,15 +68,25 @@ public class MiniQJMHACluster {
     public void startupOption(StartupOption startOpt) {
       this.startOpt = startOpt;
     }
+
+    public Builder setNumNameNodes(int nns) {
+      this.numNNs = nns;
+      return this;
+    }
+  }
+
+  public static MiniDFSNNTopology createDefaultTopology(int nns, int startingPort) {
+    MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf(NAMESERVICE);
+    for (int i = 0; i < nns; i++) {
+      nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i).setIpcPort(startingPort++)
+          .setHttpPort(startingPort++));
+    }
+
+    return new MiniDFSNNTopology().addNameservice(nameservice);
   }
-  
+
   public static MiniDFSNNTopology createDefaultTopology(int basePort) {
-    return new MiniDFSNNTopology()
-      .addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
-        new MiniDFSNNTopology.NNConf("nn1").setIpcPort(basePort)
-            .setHttpPort(basePort + 1)).addNN(
-        new MiniDFSNNTopology.NNConf("nn2").setIpcPort(basePort + 2)
-            .setHttpPort(basePort + 3)));
+    return createDefaultTopology(2, basePort);
   }
 
   private MiniQJMHACluster(Builder builder) throws IOException {
@@ -94,10 +100,10 @@ public class MiniQJMHACluster {
             .build();
         URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
 
-        // start cluster with 2 NameNodes
-        MiniDFSNNTopology topology = createDefaultTopology(basePort);
+        // start cluster with specified NameNodes
+        MiniDFSNNTopology topology = createDefaultTopology(builder.numNNs, basePort);
 
-        initHAConf(journalURI, builder.conf);
+        initHAConf(journalURI, builder.conf, builder.numNNs);
 
         // First start up the NNs just to format the namespace. The MinIDFSCluster
         // has no way to just format the NameNodes without also starting them.
@@ -110,8 +116,9 @@ public class MiniQJMHACluster {
         Configuration confNN0 = cluster.getConfiguration(0);
         NameNode.initializeSharedEdits(confNN0, true);
 
-        cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt);
-        cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt);
+        for (MiniDFSCluster.NameNodeInfo nn : cluster.getNameNodeInfos()) {
+          nn.setStartOpt(builder.startOpt);
+        }
 
         // restart the cluster
         cluster.restartNameNodes();
@@ -123,31 +130,28 @@ public class MiniQJMHACluster {
       }
     }
   }
-  
-  private Configuration initHAConf(URI journalURI, Configuration conf) {
+
+  private Configuration initHAConf(URI journalURI, Configuration conf, int numNNs) {
     conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
         journalURI.toString());
-    
-    String address1 = "127.0.0.1:" + basePort;
-    String address2 = "127.0.0.1:" + (basePort + 2);
-    conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
-        NAMESERVICE, NN1), address1);
-    conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
-        NAMESERVICE, NN2), address2);
-    conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE);
-    conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE),
-        NN1 + "," + NN2);
-    conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE,
-        ConfiguredFailoverProxyProvider.class.getName());
-    conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE);
-    
+
+    List<String> nns = new ArrayList<String>(numNNs);
+    int port = basePort;
+    for (int i = 0; i < numNNs; i++) {
+      nns.add("127.0.0.1:" + port);
+      // increment by 2 each time to account for the http port in the config setting
+      port += 2;
+    }
+
+    // use standard failover configurations
+    HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns);
     return conf;
   }
 
   public MiniDFSCluster getDfsCluster() {
     return cluster;
   }
-  
+
   public MiniJournalCluster getJournalCluster() {
     return journalCluster;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index d5a9426..b203872 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -162,7 +162,7 @@ public class TestBlockToken {
   public void testWritable() throws Exception {
     TestWritable.testWritable(new BlockTokenIdentifier());
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
     TestWritable.testWritable(generateTokenId(sm, block1,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)));
     TestWritable.testWritable(generateTokenId(sm, block2,
@@ -201,7 +201,7 @@ public class TestBlockToken {
   @Test
   public void testBlockTokenSecretManager() throws Exception {
     BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
     BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
         blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
     ExportedBlockKeys keys = masterHandler.exportKeys();
@@ -244,7 +244,7 @@ public class TestBlockToken {
     UserGroupInformation.setConfiguration(conf);
     
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
 
@@ -283,7 +283,7 @@ public class TestBlockToken {
     
     Assume.assumeTrue(FD_DIR.exists());
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
 
@@ -352,7 +352,7 @@ public class TestBlockToken {
     for (int i = 0; i < 10; i++) {
       String bpid = Integer.toString(i);
       BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
-          blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+          blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
       BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
           blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
       bpMgr.addBlockPool(bpid, slaveHandler);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
index f01be4b..0818571 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
@@ -319,7 +319,7 @@ public class TestBackupNode {
       if(fileSys != null) fileSys.close();
       if(cluster != null) cluster.shutdown();
     }
-    File nnCurDir = new File(BASE_DIR, "name1/current/");
+    File nnCurDir = new File(MiniDFSCluster.getNameNodeDirectory(BASE_DIR, 0, 0)[0], "current/");
     File bnCurDir = new File(getBackupNodeDir(op, 1), "/current/");
 
     FSImageTestUtil.assertParallelFilesAreIdentical(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
index 5a51cb7..7073726 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
@@ -1428,7 +1428,8 @@ public class TestCheckpoint {
       //
       secondary = startSecondaryNameNode(conf);
 
-      File secondaryDir = new File(MiniDFSCluster.getBaseDirectory(), "namesecondary1");
+      File secondaryDir = MiniDFSCluster.getCheckpointDirectory(MiniDFSCluster.getBaseDirectory(),
+        0, 0)[0];
       File secondaryCurrent = new File(secondaryDir, "current");
 
       long expectedTxIdToDownload = cluster.getNameNode().getFSImage()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
index 5b72901..a736d27 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
@@ -42,7 +42,8 @@ public class HAStressTestHarness {
   private MiniDFSCluster cluster;
   static final int BLOCK_SIZE = 1024;
   final TestContext testCtx = new TestContext();
-  
+  private int nns = 2;
+
   public HAStressTestHarness() {
     conf = new Configuration();
     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@@ -55,11 +56,19 @@ public class HAStressTestHarness {
   }
 
   /**
+   * Set the number of namenodes that should be run. This must be set before calling
+   * {@link #startCluster()}
+   */
+  public void setNumberOfNameNodes(int nns) {
+    this.nns = nns;
+  }
+
+  /**
    * Start and return the MiniDFSCluster.
    */
   public MiniDFSCluster startCluster() throws IOException {
     cluster = new MiniDFSCluster.Builder(conf)
-      .nnTopology(MiniDFSNNTopology.simpleHATopology())
+      .nnTopology(MiniDFSNNTopology.simpleHATopology(nns))
       .numDataNodes(3)
       .build();
     return cluster;
@@ -99,28 +108,27 @@ public class HAStressTestHarness {
   }
 
   /**
-   * Add a thread which periodically triggers failover back and forth between
-   * the two namenodes.
+   * Add a thread which periodically triggers failover back and forth between the namenodes.
    */
   public void addFailoverThread(final int msBetweenFailovers) {
     testCtx.addThread(new RepeatingTestThread(testCtx) {
-      
       @Override
       public void doAnAction() throws Exception {
-        System.err.println("==============================\n" +
-            "Failing over from 0->1\n" +
-            "==================================");
-        cluster.transitionToStandby(0);
-        cluster.transitionToActive(1);
-        
-        Thread.sleep(msBetweenFailovers);
-        System.err.println("==============================\n" +
-            "Failing over from 1->0\n" +
-            "==================================");
-
-        cluster.transitionToStandby(1);
-        cluster.transitionToActive(0);
-        Thread.sleep(msBetweenFailovers);
+        // fail over from one namenode to the next, all the way back to the original NN
+        for (int i = 0; i < nns; i++) {
+          // next node, mod nns so we wrap to the 0th NN on the last iteration
+          int next = (i + 1) % nns;
+          System.err.println("==============================\n"
+              + "[Starting] Failing over from " + i + "->" + next + "\n"
+              + "==============================");
+          cluster.transitionToStandby(i);
+          cluster.transitionToActive(next);
+          System.err.println("==============================\n"
+              + "[Completed] Failing over from " + i + "->" + next + ". Sleeping for "+
+              (msBetweenFailovers/1000) +"sec \n"
+              + "==============================");
+          Thread.sleep(msBetweenFailovers);
+        }
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index c7c4a77..5543a2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -24,9 +24,14 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -67,12 +72,11 @@ public abstract class HATestUtil {
    */
   public static void waitForStandbyToCatchUp(NameNode active,
       NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException {
-    
     long activeTxId = active.getNamesystem().getFSImage().getEditLog()
       .getLastWrittenTxId();
-    
+
     active.getRpcServer().rollEditLog();
-    
+
     long start = Time.now();
     while (Time.now() - start < TestEditLogTailer.NN_LAG_TIMEOUT) {
       long nn2HighestTxId = standby.getNamesystem().getFSImage()
@@ -166,34 +170,52 @@ public abstract class HATestUtil {
   /** Sets the required configurations for performing failover.  */
   public static void setFailoverConfigurations(MiniDFSCluster cluster,
       Configuration conf, String logicalName, int nsIndex) {
-    InetSocketAddress nnAddr1 = cluster.getNameNode(2 * nsIndex).getNameNodeAddress();
-    InetSocketAddress nnAddr2 = cluster.getNameNode(2 * nsIndex + 1).getNameNodeAddress();
-    setFailoverConfigurations(conf, logicalName, nnAddr1, nnAddr2);
+    MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
+    List<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>(3);
+    for (MiniDFSCluster.NameNodeInfo nn : nns) {
+      nnAddresses.add(nn.nameNode.getNameNodeAddress());
+    }
+    setFailoverConfigurations(conf, logicalName, nnAddresses);
+  }
+
+  public static void setFailoverConfigurations(Configuration conf, String logicalName,
+      InetSocketAddress ... nnAddresses){
+    setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses));
   }
 
   /**
    * Sets the required configurations for performing failover
    */
   public static void setFailoverConfigurations(Configuration conf,
-      String logicalName, InetSocketAddress nnAddr1,
-      InetSocketAddress nnAddr2) {
-    String nameNodeId1 = "nn1";
-    String nameNodeId2 = "nn2";
-    String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
-    String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
-    conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
-        logicalName, nameNodeId1), address1);
-    conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
-        logicalName, nameNodeId2), address2);
-    
+      String logicalName, List<InetSocketAddress> nnAddresses) {
+    setFailoverConfigurations(conf, logicalName,
+        Iterables.transform(nnAddresses, new Function<InetSocketAddress, String>() {
+
+          // transform the inet address to a simple string
+          @Override
+          public String apply(InetSocketAddress addr) {
+            return "hdfs://" + addr.getHostName() + ":" + addr.getPort();
+          }
+        }));
+  }
+
+  public static void setFailoverConfigurations(Configuration conf, String logicalName,
+      Iterable<String> nnAddresses) {
+    List<String> nnids = new ArrayList<String>();
+    int i = 0;
+    for (String address : nnAddresses) {
+      String nnId = "nn" + (i + 1);
+      nnids.add(nnId);
+      conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, logicalName, nnId), address);
+      i++;
+    }
     conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
     conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName),
-        nameNodeId1 + "," + nameNodeId2);
+        Joiner.on(',').join(nnids));
     conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
         ConfiguredFailoverProxyProvider.class.getName());
     conf.set("fs.defaultFS", "hdfs://" + logicalName);
   }
-  
 
   public static String getLogicalHostname(MiniDFSCluster cluster) {
     return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
index 7abc502..16dc766 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
@@ -46,37 +46,47 @@ import com.google.common.collect.ImmutableList;
 
 public class TestBootstrapStandby {
   private static final Log LOG = LogFactory.getLog(TestBootstrapStandby.class);
-  
+
+  private static final int maxNNCount = 3;
+  private static final int STARTING_PORT = 20000;
+
   private MiniDFSCluster cluster;
   private NameNode nn0;
-  
+
   @Before
   public void setupCluster() throws IOException {
     Configuration conf = new Configuration();
 
-    MiniDFSNNTopology topology = new MiniDFSNNTopology()
-      .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
-        .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(20001))
-        .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(20002)));
-    
+    // duplicate code with MiniQJMHACluster#createDefaultTopology, but don't want to cross
+    // dependencies or munge too much code to support it all correctly
+    MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf("ns1");
+    for (int i = 0; i < maxNNCount; i++) {
+      nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i).setHttpPort(STARTING_PORT + i + 1));
+    }
+
+    MiniDFSNNTopology topology = new MiniDFSNNTopology().addNameservice(nameservice);
+
     cluster = new MiniDFSCluster.Builder(conf)
-      .nnTopology(topology)
-      .numDataNodes(0)
-      .build();
+        .nnTopology(topology)
+        .numDataNodes(0)
+        .build();
     cluster.waitActive();
-    
+
     nn0 = cluster.getNameNode(0);
     cluster.transitionToActive(0);
-    cluster.shutdownNameNode(1);
+    // shutdown the other NNs
+    for (int i = 1; i < maxNNCount; i++) {
+      cluster.shutdownNameNode(i);
+    }
   }
-  
+
   @After
   public void shutdownCluster() {
     if (cluster != null) {
       cluster.shutdown();
     }
   }
-  
+
   /**
    * Test for the base success case. The primary NN
    * hasn't made any checkpoints, and we copy the fsimage_0
@@ -85,30 +95,29 @@ public class TestBootstrapStandby {
   @Test
   public void testSuccessfulBaseCase() throws Exception {
     removeStandbyNameDirs();
-    
-    try {
-      cluster.restartNameNode(1);
-      fail("Did not throw");
-    } catch (IOException ioe) {
-      GenericTestUtils.assertExceptionContains(
-          "storage directory does not exist or is not accessible",
-          ioe);
+
+    // skip the first NN, its up
+    for (int index = 1; index < maxNNCount; index++) {
+      try {
+        cluster.restartNameNode(index);
+        fail("Did not throw");
+      } catch (IOException ioe) {
+        GenericTestUtils.assertExceptionContains(
+            "storage directory does not exist or is not accessible", ioe);
+      }
+
+      int rc = BootstrapStandby.run(new String[] { "-nonInteractive" },
+          cluster.getConfiguration(index));
+      assertEquals(0, rc);
+
+      // Should have copied over the namespace from the active
+      FSImageTestUtil.assertNNHasCheckpoints(cluster, index, ImmutableList.of(0));
     }
-    
-    int rc = BootstrapStandby.run(
-        new String[]{"-nonInteractive"},
-        cluster.getConfiguration(1));
-    assertEquals(0, rc);
-    
-    // Should have copied over the namespace from the active
-    FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
-        ImmutableList.of(0));
-    FSImageTestUtil.assertNNFilesMatch(cluster);
 
-    // We should now be able to start the standby successfully.
-    cluster.restartNameNode(1);
+    // We should now be able to start the standbys successfully.
+    restartNameNodesFromIndex(1);
   }
-  
+
   /**
    * Test for downloading a checkpoint made at a later checkpoint
    * from the active.
@@ -123,21 +132,21 @@ public class TestBootstrapStandby {
     NameNodeAdapter.saveNamespace(nn0);
     NameNodeAdapter.leaveSafeMode(nn0);
     long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
-      .getFSImage().getMostRecentCheckpointTxId();
+        .getFSImage().getMostRecentCheckpointTxId();
     assertEquals(6, expectedCheckpointTxId);
 
-    int rc = BootstrapStandby.run(
-        new String[]{"-force"},
-        cluster.getConfiguration(1));
-    assertEquals(0, rc);
-    
-    // Should have copied over the namespace from the active
-    FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
-        ImmutableList.of((int)expectedCheckpointTxId));
+    for (int i = 1; i < maxNNCount; i++) {
+      assertEquals(0, forceBootstrap(i));
+
+      // Should have copied over the namespace from the active
+      LOG.info("Checking namenode: " + i);
+      FSImageTestUtil.assertNNHasCheckpoints(cluster, i,
+          ImmutableList.of((int) expectedCheckpointTxId));
+    }
     FSImageTestUtil.assertNNFilesMatch(cluster);
 
     // We should now be able to start the standby successfully.
-    cluster.restartNameNode(1);
+    restartNameNodesFromIndex(1);
   }
 
   /**
@@ -147,36 +156,40 @@ public class TestBootstrapStandby {
   @Test
   public void testSharedEditsMissingLogs() throws Exception {
     removeStandbyNameDirs();
-    
+
     CheckpointSignature sig = nn0.getRpcServer().rollEditLog();
     assertEquals(3, sig.getCurSegmentTxId());
-    
+
     // Should have created edits_1-2 in shared edits dir
-    URI editsUri = cluster.getSharedEditsDir(0, 1);
+    URI editsUri = cluster.getSharedEditsDir(0, maxNNCount - 1);
     File editsDir = new File(editsUri);
-    File editsSegment = new File(new File(editsDir, "current"),
+    File currentDir = new File(editsDir, "current");
+    File editsSegment = new File(currentDir,
         NNStorage.getFinalizedEditsFileName(1, 2));
     GenericTestUtils.assertExists(editsSegment);
+    GenericTestUtils.assertExists(currentDir);
 
     // Delete the segment.
     assertTrue(editsSegment.delete());
-    
+
     // Trying to bootstrap standby should now fail since the edit
     // logs aren't available in the shared dir.
     LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
         LogFactory.getLog(BootstrapStandby.class));
     try {
-      int rc = BootstrapStandby.run(
-          new String[]{"-force"},
-          cluster.getConfiguration(1));
-      assertEquals(BootstrapStandby.ERR_CODE_LOGS_UNAVAILABLE, rc);
+      assertEquals(BootstrapStandby.ERR_CODE_LOGS_UNAVAILABLE, forceBootstrap(1));
     } finally {
       logs.stopCapturing();
     }
     GenericTestUtils.assertMatches(logs.getOutput(),
         "FATAL.*Unable to read transaction ids 1-3 from the configured shared");
   }
-  
+
+  /**
+   * Show that bootstrapping will fail on a given NameNode if its directories already exist. Its not
+   * run across all the NN because its testing the state local on each node.
+   * @throws Exception on unexpected failure
+   */
   @Test
   public void testStandbyDirsAlreadyExist() throws Exception {
     // Should not pass since standby dirs exist, force not given
@@ -186,12 +199,9 @@ public class TestBootstrapStandby {
     assertEquals(BootstrapStandby.ERR_CODE_ALREADY_FORMATTED, rc);
 
     // Should pass with -force
-    rc = BootstrapStandby.run(
-        new String[]{"-force"},
-        cluster.getConfiguration(1));
-    assertEquals(0, rc);
+    assertEquals(0, forceBootstrap(1));
   }
-  
+
   /**
    * Test that, even if the other node is not active, we are able
    * to bootstrap standby from it.
@@ -199,18 +209,44 @@ public class TestBootstrapStandby {
   @Test(timeout=30000)
   public void testOtherNodeNotActive() throws Exception {
     cluster.transitionToStandby(0);
-    int rc = BootstrapStandby.run(
-        new String[]{"-force"},
-        cluster.getConfiguration(1));
-    assertEquals(0, rc);
+    assertSuccessfulBootstrapFromIndex(1);
   }
 
   private void removeStandbyNameDirs() {
-    for (URI u : cluster.getNameDirs(1)) {
-      assertTrue(u.getScheme().equals("file"));
-      File dir = new File(u.getPath());
-      LOG.info("Removing standby dir " + dir);
-      assertTrue(FileUtil.fullyDelete(dir));
+    for (int i = 1; i < maxNNCount; i++) {
+      for (URI u : cluster.getNameDirs(i)) {
+        assertTrue(u.getScheme().equals("file"));
+        File dir = new File(u.getPath());
+        LOG.info("Removing standby dir " + dir);
+        assertTrue(FileUtil.fullyDelete(dir));
+      }
+    }
+  }
+
+  private void restartNameNodesFromIndex(int start) throws IOException {
+    for (int i = start; i < maxNNCount; i++) {
+      // We should now be able to start the standby successfully.
+      cluster.restartNameNode(i, false);
+    }
+
+    cluster.waitClusterUp();
+    cluster.waitActive();
+  }
+
+  /**
+   * Force boot strapping on a namenode
+   * @param i index of the namenode to attempt
+   * @return exit code
+   * @throws Exception on unexpected failure
+   */
+  private int forceBootstrap(int i) throws Exception {
+    return BootstrapStandby.run(new String[] { "-force" },
+        cluster.getConfiguration(i));
+  }
+
+  private void assertSuccessfulBootstrapFromIndex(int start) throws Exception {
+    for (int i = start; i < maxNNCount; i++) {
+      assertEquals(0, forceBootstrap(i));
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java
index ca8f563..db9a2de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java
@@ -52,7 +52,8 @@ public class TestBootstrapStandbyWithQJM {
 
   private MiniDFSCluster cluster;
   private MiniJournalCluster jCluster;
-  
+  private int nnCount = 3;
+
   @Before
   public void setup() throws Exception {
     Configuration conf = new Configuration();
@@ -62,7 +63,8 @@ public class TestBootstrapStandbyWithQJM {
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
         0);
 
-    MiniQJMHACluster miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).build();
+    MiniQJMHACluster miniQjmHaCluster =
+        new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
     cluster = miniQjmHaCluster.getDfsCluster();
     jCluster = miniQjmHaCluster.getJournalCluster();
     
@@ -90,18 +92,7 @@ public class TestBootstrapStandbyWithQJM {
   public void testBootstrapStandbyWithStandbyNN() throws Exception {
     // make the first NN in standby state
     cluster.transitionToStandby(0);
-    Configuration confNN1 = cluster.getConfiguration(1);
-    
-    // shut down nn1
-    cluster.shutdownNameNode(1);
-    
-    int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1);
-    assertEquals(0, rc);
-    
-    // Should have copied over the namespace from the standby
-    FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
-        ImmutableList.of(0));
-    FSImageTestUtil.assertNNFilesMatch(cluster);
+    bootstrapStandbys();
   }
   
   /** BootstrapStandby when the existing NN is active */
@@ -109,17 +100,23 @@ public class TestBootstrapStandbyWithQJM {
   public void testBootstrapStandbyWithActiveNN() throws Exception {
     // make the first NN in active state
     cluster.transitionToActive(0);
-    Configuration confNN1 = cluster.getConfiguration(1);
-    
-    // shut down nn1
-    cluster.shutdownNameNode(1);
-    
-    int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1);
-    assertEquals(0, rc);
-    
-    // Should have copied over the namespace from the standby
-    FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
-        ImmutableList.of(0));
+    bootstrapStandbys();
+  }
+
+  private void bootstrapStandbys() throws Exception {
+    // shutdown and bootstrap all the other nns, except the first (start 1, not 0)
+    for (int i = 1; i < nnCount; i++) {
+      Configuration otherNNConf = cluster.getConfiguration(i);
+
+      // shut down other nn
+      cluster.shutdownNameNode(i);
+
+      int rc = BootstrapStandby.run(new String[] { "-force" }, otherNNConf);
+      assertEquals(0, rc);
+
+      // Should have copied over the namespace from the standby
+      FSImageTestUtil.assertNNHasCheckpoints(cluster, i, ImmutableList.of(0));
+    }
     FSImageTestUtil.assertNNFilesMatch(cluster);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
index e7cba75..9164582 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
@@ -107,6 +107,7 @@ public class TestDNFencingWithReplication {
   @Test
   public void testFencingStress() throws Exception {
     HAStressTestHarness harness = new HAStressTestHarness();
+    harness.setNumberOfNameNodes(3);
     harness.conf.setInt(
         DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
     harness.conf.setInt(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
index 8c61c92..aea4f87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
@@ -113,7 +113,12 @@ public class TestEditLogTailer {
   public void testNN1TriggersLogRolls() throws Exception {
     testStandbyTriggersLogRolls(1);
   }
-  
+
+  @Test
+  public void testNN2TriggersLogRolls() throws Exception {
+    testStandbyTriggersLogRolls(2);
+  }
+
   private static void testStandbyTriggersLogRolls(int activeIndex)
       throws Exception {
     Configuration conf = new Configuration();
@@ -125,7 +130,8 @@ public class TestEditLogTailer {
     MiniDFSNNTopology topology = new MiniDFSNNTopology()
       .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
         .addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10031))
-        .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10032)));
+        .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10032))
+        .addNN(new MiniDFSNNTopology.NNConf("nn3").setIpcPort(10033)));
 
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
       .nnTopology(topology)
@@ -145,7 +151,7 @@ public class TestEditLogTailer {
   
   private static void waitForLogRollInSharedDir(MiniDFSCluster cluster,
       long startTxId) throws Exception {
-    URI sharedUri = cluster.getSharedEditsDir(0, 1);
+    URI sharedUri = cluster.getSharedEditsDir(0, 2);
     File sharedDir = new File(sharedUri.getPath(), "current");
     final File expectedLog = new File(sharedDir,
         NNStorage.getInProgressEditsFileName(startTxId));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
index 151e7d3..116079a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
@@ -56,10 +56,11 @@ public class TestFailoverWithBlockTokensEnabled {
   
   private static final Path TEST_PATH = new Path("/test-path");
   private static final String TEST_DATA = "very important text";
-  
+  private static final int numNNs = 3;
+
   private Configuration conf;
   private MiniDFSCluster cluster;
-  
+
   @Before
   public void startCluster() throws IOException {
     conf = new Configuration();
@@ -67,7 +68,7 @@ public class TestFailoverWithBlockTokensEnabled {
     // Set short retry timeouts so this test runs faster
     conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
     cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .nnTopology(MiniDFSNNTopology.simpleHATopology(numNNs))
         .numDataNodes(1)
         .build();
   }
@@ -78,33 +79,41 @@ public class TestFailoverWithBlockTokensEnabled {
       cluster.shutdown();
     }
   }
-  
+
   @Test
   public void ensureSerialNumbersNeverOverlap() {
     BlockTokenSecretManager btsm1 = cluster.getNamesystem(0).getBlockManager()
         .getBlockTokenSecretManager();
     BlockTokenSecretManager btsm2 = cluster.getNamesystem(1).getBlockManager()
         .getBlockTokenSecretManager();
-    
-    btsm1.setSerialNo(0);
-    btsm2.setSerialNo(0);
-    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
-    
-    btsm1.setSerialNo(Integer.MAX_VALUE);
-    btsm2.setSerialNo(Integer.MAX_VALUE);
-    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
-    
-    btsm1.setSerialNo(Integer.MIN_VALUE);
-    btsm2.setSerialNo(Integer.MIN_VALUE);
-    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
-    
-    btsm1.setSerialNo(Integer.MAX_VALUE / 2);
-    btsm2.setSerialNo(Integer.MAX_VALUE / 2);
-    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
+    BlockTokenSecretManager btsm3 = cluster.getNamesystem(2).getBlockManager()
+        .getBlockTokenSecretManager();
+
+    setAndCheckSerialNumber(0, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MAX_VALUE, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MIN_VALUE, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MAX_VALUE / 2, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MIN_VALUE / 2, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MAX_VALUE / 3, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MIN_VALUE / 3, btsm1, btsm2, btsm3);
+  }
+
+  private void setAndCheckSerialNumber(int serialNumber, BlockTokenSecretManager... btsms) {
+    for (BlockTokenSecretManager btsm : btsms) {
+      btsm.setSerialNo(serialNumber);
+    }
 
-    btsm1.setSerialNo(Integer.MIN_VALUE / 2);
-    btsm2.setSerialNo(Integer.MIN_VALUE / 2);
-    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
+    for (int i = 0; i < btsms.length; i++) {
+      for (int j = 0; j < btsms.length; j++) {
+        if (j == i) {
+          continue;
+        }
+        int first = btsms[i].getSerialNoForTesting();
+        int second = btsms[j].getSerialNoForTesting();
+        assertFalse("Overlap found for set serial number (" + serialNumber + ") is " + i + ": "
+            + first + " == " + j + ": " + second, first == second);
+      }
+    }
   }
   
   @Test


Mime
View raw message