hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1076040 - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/aop/org/apache/hadoop/fs/ src/test/aop/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/fs/ src/test/hdfs/org/apache/hadoop/h...
Date Tue, 01 Mar 2011 22:43:41 GMT
Author: suresh
Date: Tue Mar  1 22:43:39 2011
New Revision: 1076040

URL: http://svn.apache.org/viewvc?rev=1076040&view=rev
Log:
HDFS-1652. Add support for multiple namenodes in MiniDFSCluster. Contributed by Suresh Srinivas.

Removed:
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeShutdown.java
Modified:
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fs/TestFiRename.java
    hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsCreateMkdir.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsPermission.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListFilesInFileContext.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Tue Mar  1 22:43:39 2011
@@ -83,17 +83,20 @@ Trunk (unreleased changes)
 
     HDFS-1669. Federation: Fix TestHftpFileSystem failure. (suresh)
 
-    HDFS-1670. HDFS Federation: remove dnRegistration from Datanode (boryas)
+    HDFS-1670. Federation: remove dnRegistration from Datanode (boryas)
 
-    HDFS-1662. HDFS federation: fix unit test case, TestCheckpoint 
+    HDFS-1662. Federation: fix unit test case, TestCheckpoint 
     and TestDataNodeMXBean (tanping via boryas)
 
-    HDFS-1671. HDFS Federation: shutdown in DataNode should be able to 
+    HDFS-1671. Federation: shutdown in DataNode should be able to 
     shutdown individual BP threads as well as the whole DN (boryas).
 
-    HDFS-1663. HDFS federation: Rename getPoolId() everywhere to 
+    HDFS-1663. Federation: Rename getPoolId() everywhere to 
     getBlockPoolId() (tanping via boryas)
 
+    HDFS-1652. Add support for multiple namenodes in MiniDFSCluster.
+    (suresh)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Mar  1 22:43:39 2011
@@ -2430,7 +2430,7 @@ public class DataNode extends Configured
     return clusterId;
   }
   
-  void refreshNamenodes(Configuration conf) throws IOException {
+  public void refreshNamenodes(Configuration conf) throws IOException {
     try {
       blockPoolManager.refreshNamenodes(conf);
     } catch (InterruptedException ex) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fs/TestFiRename.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fs/TestFiRename.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fs/TestFiRename.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fs/TestFiRename.java Tue Mar  1 22:43:39 2011
@@ -89,7 +89,7 @@ public class TestFiRename {
     }
     cluster = new MiniDFSCluster(CONF, 1, format, null);
     cluster.waitClusterUp();
-    fc = FileContext.getFileContext(cluster.getURI(), CONF);
+    fc = FileContext.getFileContext(cluster.getURI(0), CONF);
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java Tue Mar  1 22:43:39 2011
@@ -130,7 +130,7 @@ public class TestFiHftp {
     DFSTestUtil.waitReplication(dfs, filepath, DATANODE_NUM);
 
     //test hftp open and read
-    final HftpFileSystem hftpfs = cluster.getHftpFileSystem();
+    final HftpFileSystem hftpfs = cluster.getHftpFileSystem(0);
     {
       final FSDataInputStream in = hftpfs.open(filepath);
       long bytesRead = 0;

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsCreateMkdir.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsCreateMkdir.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsCreateMkdir.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsCreateMkdir.java Tue Mar  1 22:43:39 2011
@@ -43,7 +43,7 @@ public class TestFcHdfsCreateMkdir exten
                                     throws IOException, LoginException, URISyntaxException  {
     Configuration conf = new HdfsConfiguration();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    fc = FileContext.getFileContext(cluster.getURI(), conf);
+    fc = FileContext.getFileContext(cluster.getURI(0), conf);
     defaultWorkingDirectory = fc.makeQualified( new Path("/user/" + 
         UserGroupInformation.getCurrentUser().getShortUserName()));
     fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsPermission.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsPermission.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsPermission.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsPermission.java Tue Mar  1 22:43:39 2011
@@ -43,7 +43,7 @@ public class TestFcHdfsPermission extend
                                     throws IOException, LoginException, URISyntaxException  {
     Configuration conf = new HdfsConfiguration();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    fc = FileContext.getFileContext(cluster.getURI(), conf);
+    fc = FileContext.getFileContext(cluster.getURI(0), conf);
     defaultWorkingDirectory = fc.makeQualified( new Path("/user/" + 
         UserGroupInformation.getCurrentUser().getShortUserName()));
     fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java Tue Mar  1 22:43:39 2011
@@ -55,7 +55,7 @@ public class TestFcHdfsSymlink extends F
   }
 
   protected URI testURI() {
-    return cluster.getURI();
+    return cluster.getURI(0);
   }
 
   @Override
@@ -72,7 +72,7 @@ public class TestFcHdfsSymlink extends F
     conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
     conf.set(FsPermission.UMASK_LABEL, "000");
     cluster = new MiniDFSCluster.Builder(conf).build();
-    fc = FileContext.getFileContext(cluster.getURI());
+    fc = FileContext.getFileContext(cluster.getURI(0));
   }
   
   @AfterClass
@@ -129,7 +129,7 @@ public class TestFcHdfsSymlink extends F
     // Ditto when using another file context since the file system
     // for the slash is resolved according to the link's parent.
     FileContext localFc = FileContext.getLocalFSFileContext();
-    Path linkQual = new Path(cluster.getURI().toString(), fileViaLink); 
+    Path linkQual = new Path(cluster.getURI(0).toString(), fileViaLink); 
     assertEquals(fileSize, localFc.getFileStatus(linkQual).getLen());    
   }
   

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java Tue Mar  1 22:43:39 2011
@@ -50,7 +50,7 @@ public class TestHDFSFileContextMainOper
       LoginException, URISyntaxException {
     cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
     cluster.waitClusterUp();
-    fc = FileContext.getFileContext(cluster.getURI(), CONF);
+    fc = FileContext.getFileContext(cluster.getURI(0), CONF);
     defaultWorkingDirectory = fc.makeQualified( new Path("/user/" + 
         UserGroupInformation.getCurrentUser().getShortUserName()));
     fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);
@@ -64,7 +64,7 @@ public class TestHDFSFileContextMainOper
     cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(1)
                                               .format(false).build();
     cluster.waitClusterUp();
-    fc = FileContext.getFileContext(cluster.getURI(), CONF);
+    fc = FileContext.getFileContext(cluster.getURI(0), CONF);
     defaultWorkingDirectory = fc.makeQualified( new Path("/user/" + 
         UserGroupInformation.getCurrentUser().getShortUserName()));
     fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Mar  1 22:43:39 2011
@@ -78,6 +78,7 @@ public class MiniDFSCluster {
   public static class Builder {
     private int nameNodePort = 0;
     private final Configuration conf;
+    private int numNameNodes = 1;
     private int numDataNodes = 1;
     private boolean format = true;
     private boolean manageNameDfsDirs = true;
@@ -103,6 +104,14 @@ public class MiniDFSCluster {
     /**
      * Default: 1
      */
+    public Builder numNameNodes(int val) {
+      this.numNameNodes = val;
+      return this;
+    }
+
+    /**
+     * Default: 1
+     */
     public Builder numDataNodes(int val) {
       this.numDataNodes = val;
       return this;
@@ -184,6 +193,7 @@ public class MiniDFSCluster {
    * Used by builder to create and return an instance of MiniDFSCluster
    */
   private MiniDFSCluster(Builder builder) throws IOException {
+    nameNodes = new NameNodeInfo[builder.numNameNodes];
     initMiniDFSCluster(builder.nameNodePort,
                        builder.conf,
                        builder.numDataNodes,
@@ -210,9 +220,8 @@ public class MiniDFSCluster {
   }
   private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class);
 
-  private URI myUri = null;
   private Configuration conf;
-  private NameNode nameNode;
+  private NameNodeInfo[] nameNodes;
   private int numDataNodes;
   private ArrayList<DataNodeProperties> dataNodes = 
                          new ArrayList<DataNodeProperties>();
@@ -221,12 +230,24 @@ public class MiniDFSCluster {
   
   public final static String FINALIZED_DIR_NAME = "/current/finalized/";
   
+  /**
+   * Stores the information related to a namenode in the cluster
+   */
+  static class NameNodeInfo {
+    final NameNode nameNode;
+    final Configuration conf;
+    NameNodeInfo(NameNode nn, Configuration conf) {
+      this.nameNode = nn;
+      this.conf = new Configuration(conf);
+    }
+  }
   
   /**
    * This null constructor is used only when wishing to start a data node cluster
    * without a name node (ie when the name node is started elsewhere).
    */
   public MiniDFSCluster() {
+    nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
   }
   
   /**
@@ -387,7 +408,9 @@ public class MiniDFSCluster {
                         StartupOption operation,
                         String[] racks, String hosts[],
                         long[] simulatedCapacities) throws IOException {
-    initMiniDFSCluster(nameNodePort, conf, numDataNodes, format,
+    this.numDataNodes = numDataNodes;
+    this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
+    initMiniDFSCluster(nameNodePort, conf, 1, format,
         manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts,
         simulatedCapacities, null);
   }
@@ -422,17 +445,50 @@ public class MiniDFSCluster {
       conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
                       false);
     }
-
+    
+    // Create namenodes in the cluster
+    for (int i = 0; i < nameNodes.length; i++) {
+    int nnPort = nameNodePort + 2 * i;
+      createNameNode(i, conf, numDataNodes, manageNameDfsDirs, nnPort,
+          format, operation, clusterId);
+    }
+    // Set default URI for a single namenode cluster
+    if (nameNodes.length == 1) {
+      FileSystem.setDefaultUri(conf, getURI(0));
+    }
+    
+    if (format) {
+      if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
+        throw new IOException("Cannot remove data directory: " + data_dir);
+      }
+    }
+    
+    // Start the DataNodes
+    startDataNodes(conf, numDataNodes, manageDataDfsDirs, 
+                    operation, racks, hosts, simulatedCapacities);
+    waitClusterUp();
+    //make sure ProxyUsers uses the latest conf
+    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
+  }
+  
+  private void createNameNode(int nnIndex, Configuration conf,
+              int numDataNodes,
+              boolean manageNameDfsDirs,
+              int nameNodePort,
+              boolean format,
+              StartupOption operation,
+              String clusterId) throws IOException {
+    // TODO:FEDERATION cleanup - cluster configuration needs to change
     // Setup the NameNode configuration
-    FileSystem.setDefaultUri(conf, "hdfs://localhost:"+ Integer.toString(nameNodePort));
+    conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "127.0.0.1:" + nameNodePort);
     conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");  
     if (manageNameDfsDirs) {
       conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
-          fileAsURI(new File(base_dir, "name1"))+","+
-          fileAsURI(new File(base_dir, "name2")));
+          fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
+          fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
       conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
-          fileAsURI(new File(base_dir, "namesecondary1"))+","+
-          fileAsURI(new File(base_dir, "namesecondary2")));
+          fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
+          fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
     }
     
     int replication = conf.getInt("dfs.replication", 3);
@@ -442,9 +498,6 @@ public class MiniDFSCluster {
     
     // Format and clean out DataNode directories
     if (format) {
-      if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
-        throw new IOException("Cannot remove data directory: " + data_dir);
-      }
       GenericTestUtils.formatNamenode(conf);
     }
     if (operation == StartupOption.UPGRADE){
@@ -458,21 +511,8 @@ public class MiniDFSCluster {
       new String[] {} : new String[] {operation.getName()};
     conf.setClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
                    StaticMapping.class, DNSToSwitchMapping.class);
-    nameNode = NameNode.createNameNode(args, conf);
-    
-    // Start the DataNodes
-    startDataNodes(conf, numDataNodes, manageDataDfsDirs, 
-                    operation, racks, hosts, simulatedCapacities);
-    waitClusterUp();
-    String myUriStr = "hdfs://localhost:"+ Integer.toString(this.getNameNodePort());
-    try {
-      this.myUri = new URI(myUriStr);
-    } catch (URISyntaxException e) {
-      NameNode.LOG.warn("unexpected URISyntaxException: " + e );
-    }
-    
-    //make sure ProxyUsers uses the latest conf
-    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
+    NameNode nn = NameNode.createNameNode(args, conf);
+    nameNodes[nnIndex] = new NameNodeInfo(nn, conf);
   }
   
   private void setRpcEngine(Configuration conf, Class<?> protocol, Class<?> engine) {
@@ -480,24 +520,42 @@ public class MiniDFSCluster {
   }
 
   /**
-   * 
-   * @return URI of this MiniDFSCluster
+   * @return URI of the given namenode in MiniDFSCluster
    */
-  public URI getURI() {
-    return myUri;
+  public URI getURI(int nnIndex) {
+    InetSocketAddress addr = nameNodes[nnIndex].nameNode.getNameNodeAddress();
+    String hostPort = NameNode.getHostPortString(addr);
+    URI uri = null;
+    try {
+      uri = new URI("hdfs://" + hostPort);
+    } catch (URISyntaxException e) {
+      NameNode.LOG.warn("unexpected URISyntaxException: " + e );
+    }
+    return uri;
   }
 
   /**
-   * Get configuration.
-   * @return Configuration of this MiniDFSCluster
+   * @return Configuration of for the given namenode
    */
-  public Configuration getConfiguration() {
-    return conf;
+  public Configuration getConfiguration(int nnIndex) {
+    return nameNodes[nnIndex].conf;
   }
 
   /**
-   * wait for the cluster to get out of 
-   * safemode.
+   * wait for the given namenode to get out of safemode.
+   */
+  public void waitNameNodeUp(int nnIndex) {
+    while (!isNameNodeUp(nnIndex)) {
+      try {
+        LOG.warn("Waiting for namenode at " + nnIndex + " to start...");
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+  
+  /**
+   * wait for the cluster to get out of safemode.
    */
   public void waitClusterUp() {
     if (numDataNodes > 0) {
@@ -547,13 +605,7 @@ public class MiniDFSCluster {
     // If minicluster's name node is null assume that the conf has been
     // set with the right address:port of the name node.
     //
-    if (nameNode != null) { // set conf from the name node
-      InetSocketAddress nnAddr = nameNode.getNameNodeAddress(); 
-      int nameNodePort = nnAddr.getPort(); 
-      FileSystem.setDefaultUri(conf, 
-                               "hdfs://"+ nnAddr.getHostName() +
-                               ":" + Integer.toString(nameNodePort));
-    }
+    setNamenodeList(conf);
     
     if (racks != null && numDataNodes > racks.length ) {
       throw new IllegalArgumentException( "The length of racks [" + racks.length
@@ -706,6 +758,30 @@ public class MiniDFSCluster {
                    simulatedCapacities);
     
   }
+
+  /**
+   * Finalize the namenode. Block pools corresponding to the namenode are
+   * finalized on the datanode.
+   */
+  private void finalizeNamenode(NameNode nn, Configuration conf) throws Exception {
+    if (nn == null) {
+      throw new IllegalStateException("Attempting to finalize "
+                                      + "Namenode but it is not running");
+    }
+    ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
+  }
+  
+  /**
+   * Finalize cluster for the namenode at the given index 
+   * @see {@link MiniDFSCluster#finalizeCluster(Configuration)}
+   * @param nnIndex
+   * @param conf
+   * @throws Exception
+   */
+  public void finalizeCluster(int nnIndex, Configuration conf) throws Exception {
+    finalizeNamenode(nameNodes[nnIndex].nameNode, nameNodes[nnIndex].conf);
+  }
+
   /**
    * If the NameNode is running, attempt to finalize a previous upgrade.
    * When this method return, the NameNode should be finalized, but
@@ -714,18 +790,28 @@ public class MiniDFSCluster {
    * @throws IllegalStateException if the Namenode is not running.
    */
   public void finalizeCluster(Configuration conf) throws Exception {
-    if (nameNode == null) {
-      throw new IllegalStateException("Attempting to finalize "
-                                      + "Namenode but it is not running");
+    for (NameNodeInfo nnInfo : nameNodes) {
+      if (nnInfo == null) {
+        throw new IllegalStateException("Attempting to finalize "
+            + "Namenode but it is not running");
+      }
+      finalizeNamenode(nnInfo.nameNode, nnInfo.conf);
     }
-    ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
   }
   
   /**
    * Gets the started NameNode.  May be null.
    */
   public NameNode getNameNode() {
-    return nameNode;
+    checkSingleNameNode();
+    return getNameNode(0);
+  }
+  
+  /**
+   * Gets the NameNode for the index.  May be null.
+   */
+  public NameNode getNameNode(int nnIndex) {
+    return nameNodes[nnIndex].nameNode;
   }
   
   /**
@@ -733,7 +819,12 @@ public class MiniDFSCluster {
    * @return {@link FSNamesystem} object.
    */
   public FSNamesystem getNamesystem() {
-    return NameNodeAdapter.getNamesystem(nameNode);
+    checkSingleNameNode();
+    return NameNodeAdapter.getNamesystem(nameNodes[0].nameNode);
+  }
+  
+  public FSNamesystem getNamesystem(int nnIndex) {
+    return NameNodeAdapter.getNamesystem(nameNodes[nnIndex].nameNode);
   }
 
   /**
@@ -761,21 +852,34 @@ public class MiniDFSCluster {
   /**
    * Gets the rpc port used by the NameNode, because the caller 
    * supplied port is not necessarily the actual port used.
+   * Assumption: cluster has a single namenode
    */     
   public int getNameNodePort() {
-    return nameNode.getNameNodeAddress().getPort();
+    checkSingleNameNode();
+    return getNameNodePort(0);
+  }
+    
+  /**
+   * Gets the rpc port used by the NameNode at the given index, because the
+   * caller supplied port is not necessarily the actual port used.
+   */     
+  public int getNameNodePort(int nnIndex) {
+    return nameNodes[nnIndex].nameNode.getNameNodeAddress().getPort();
   }
     
   /**
-   * Shut down the servers that are up.
+   * Shutdown all the nodes in the cluster.
    */
   public void shutdown() {
     System.out.println("Shutting down the Mini HDFS Cluster");
     shutdownDataNodes();
-    if (nameNode != null) {
-      nameNode.stop();
-      nameNode.join();
-      nameNode = null;
+    for (NameNodeInfo nnInfo : nameNodes) {
+      NameNode nameNode = nnInfo.nameNode;
+      if (nameNode != null) {
+        nameNode.stop();
+        nameNode.join();
+        nameNode = null;
+      }
     }
   }
   
@@ -793,23 +897,35 @@ public class MiniDFSCluster {
   }
 
   /**
-   * Shutdown namenode.
+   * Shutdown all the namenodes.
    */
-  public synchronized void shutdownNameNode() {
-    if (nameNode != null) {
+  public synchronized void shutdownNameNodes() {
+    for (int i = 0; i < nameNodes.length; i++) {
+      shutdownNameNode(i);
+    }
+  }
+  
+  /**
+   * Shutdown the namenode at a given index.
+   */
+  public synchronized void shutdownNameNode(int nnIndex) {
+    NameNode nn = nameNodes[nnIndex].nameNode;
+    if (nn != null) {
       System.out.println("Shutting down the namenode");
-      nameNode.stop();
-      nameNode.join();
-      nameNode = null;
+      nn.stop();
+      nn.join();
+      nameNodes[nnIndex] = null;
     }
   }
 
   /**
-   * Restart namenode.
+   * Restart namenode at a given index.
    */
-  public synchronized void restartNameNode() throws IOException {
-    shutdownNameNode();
-    nameNode = NameNode.createNameNode(new String[] {}, conf);
+  public synchronized void restartNameNode(int nnIndex) throws IOException {
+    Configuration conf = nameNodes[nnIndex].conf;
+    shutdownNameNode(nnIndex);
+    NameNode nn = NameNode.createNameNode(new String[] {}, conf);
+    nameNodes[nnIndex] = new NameNodeInfo(nn, conf);
     waitClusterUp();
     System.out.println("Restarted the namenode");
     int failedCount = 0;
@@ -978,19 +1094,32 @@ public class MiniDFSCluster {
   }
   
   /**
-   * Returns true if the NameNode is running and is out of Safe Mode.
+   * Returns true if the given namenode is running and is out of Safe Mode.
    */
-  public boolean isClusterUp() {
+  public boolean isNameNodeUp(int nnIndex) {
+    NameNode nameNode = nameNodes[nnIndex].nameNode;
     if (nameNode == null) {
       return false;
     }
     long[] sizes = nameNode.getStats();
     boolean isUp = false;
     synchronized (this) {
-      isUp = (!nameNode.isInSafeMode() && sizes[0] != 0);
+      isUp = !(nameNode.isInSafeMode() || sizes[0] == 0);
     }
     return isUp;
   }
+
+  /**
+   * Returns true if all the NameNodes are running and is out of Safe Mode.
+   */
+  public boolean isClusterUp() {
+    for (int index = 0; index < nameNodes.length; index++) {
+      if (!isNameNodeUp(index)) {
+        return false;
+      }
+    }
+    return true;
+  }
   
   /**
    * Returns true if there is at least one DataNode running.
@@ -1003,29 +1132,46 @@ public class MiniDFSCluster {
   }
   
   /**
-   * Get a client handle to the DFS cluster.
+   * Get a client handle to the DFS cluster with a single namenode.
    */
   public FileSystem getFileSystem() throws IOException {
-    return FileSystem.get(conf);
+    checkSingleNameNode();
+    return getFileSystem(0);
   }
   
+  /**
+   * Get a client handle to the DFS cluster for the namenode at given index.
+   */
+  public FileSystem getFileSystem(int nnIndex) throws IOException {
+    return FileSystem.get(getURI(nnIndex), nameNodes[nnIndex].conf);
+  }
 
   /**
    * Get another FileSystem instance that is different from FileSystem.get(conf).
    * This simulating different threads working on different FileSystem instances.
    */
-  public FileSystem getNewFileSystemInstance() throws IOException {
-    return FileSystem.newInstance(conf);
+  public FileSystem getNewFileSystemInstance(int nnIndex) throws IOException {
+    return FileSystem.newInstance(getURI(nnIndex), nameNodes[nnIndex].conf);
+  }
+  
+  /**
+   * @return a http URL
+   */
+  public String getHttpUri(int nnIndex) throws IOException {
+    return "http://"
+        + nameNodes[nnIndex].conf
+            .get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
   }
   
   /**
    * @return a {@link HftpFileSystem} object.
    */
-  public HftpFileSystem getHftpFileSystem() throws IOException {
-    final String str = "hftp://"
-        + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+  public HftpFileSystem getHftpFileSystem(int nnIndex) throws IOException {
+    String uri = "hftp://"
+        + nameNodes[nnIndex].conf
+            .get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
     try {
-      return (HftpFileSystem)FileSystem.get(new URI(str), conf); 
+      return (HftpFileSystem)FileSystem.get(new URI(uri), conf);
     } catch (URISyntaxException e) {
       throw new IOException(e);
     }
@@ -1035,14 +1181,14 @@ public class MiniDFSCluster {
    *  @return a {@link HftpFileSystem} object as specified user. 
    */
   public HftpFileSystem getHftpFileSystemAs(final String username,
-      final Configuration conf, final String... groups
-      ) throws IOException, InterruptedException {
+      final Configuration conf, final int nnIndex, final String... groups)
+      throws IOException, InterruptedException {
     final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
         username, groups);
     return ugi.doAs(new PrivilegedExceptionAction<HftpFileSystem>() {
       @Override
       public HftpFileSystem run() throws Exception {
-        return getHftpFileSystem();
+        return getHftpFileSystem(nnIndex);
       }
     });
   }
@@ -1050,29 +1196,27 @@ public class MiniDFSCluster {
   /**
    * Get the directories where the namenode stores its image.
    */
-  public Collection<URI> getNameDirs() {
-    return FSNamesystem.getNamespaceDirs(conf);
+  public Collection<URI> getNameDirs(int nnIndex) {
+    return FSNamesystem.getNamespaceDirs(nameNodes[nnIndex].conf);
   }
 
   /**
    * Get the directories where the namenode stores its edits.
    */
-  public Collection<URI> getNameEditsDirs() {
-    return FSNamesystem.getNamespaceEditsDirs(conf);
+  public Collection<URI> getNameEditsDirs(int nnIndex) {
+    return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
   }
 
-  /**
-   * Wait until the cluster is active and running.
-   */
-  public void waitActive() throws IOException {
-    if (nameNode == null) {
+  /** Wait until the given namenode gets registration from all the datanodes */
+  public void waitActive(int nnIndex) throws IOException {
+    if (nameNodes.length == 0 || nameNodes[nnIndex] == null) {
       return;
     }
     InetSocketAddress addr = new InetSocketAddress("localhost",
-                                                   getNameNodePort());
+        getNameNodePort(nnIndex));
     DFSClient client = new DFSClient(addr, conf);
 
-    // make sure all datanodes have registered and sent heartbeat
+    // ensure all datanodes have registered and sent heartbeat to the namenode
     while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) {
       try {
         Thread.sleep(100);
@@ -1083,6 +1227,15 @@ public class MiniDFSCluster {
     client.close();
   }
   
+  /**
+   * Wait until the cluster is active and running.
+   */
+  public void waitActive() throws IOException {
+    for (int index = 0; index < nameNodes.length; index++) {
+      waitActive(index);
+    }
+  }
+  
   private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) {
     for (DataNodeProperties dn : dataNodes) {
       // If any one of the datanode is down, then do not continue to wait
@@ -1297,4 +1450,59 @@ public class MiniDFSCluster {
     }
     return null;
   }
+  
+  /**
+   * Add comma separated list of namenode addresses to the config file
+   */
+  private void setNamenodeList(Configuration conf) {
+    String list = "";
+    for (int i = 0 ; i < nameNodes.length; i++) {
+      if (list.length() > 0) {
+        list += ",";
+      }
+      if (nameNodes[i].nameNode == null) {
+        continue;
+      }
+      list += getURI(i);
+    }
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, list);
+  }
+  
+  /**
+   * Throw an exception if the MiniDFSCluster is not started with a single
+   * namenode
+   */
+  private void checkSingleNameNode() {
+    if (nameNodes.length != 1) {
+      throw new IllegalArgumentException("Namenode index is needed");
+    }
+  }
+  
+  /**
+   * Add a namenode to cluster and start it. Configuration of datanodes
+   * in the cluster is refreshed to register with the new namenode.
+   * @return newly started namenode
+   */
+  public NameNode addNameNode(Configuration conf, int namenodePort)
+      throws IOException {
+    int nnIndex = nameNodes.length;
+    int numNameNodes = nameNodes.length + 1;
+    NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes];
+    System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length);
+    nameNodes = newlist;
+    createNameNode(nnIndex, conf, numDataNodes, true, namenodePort, true, null,
+        null);
+
+    // Refresh datanodes with the newly started namenode
+    for (DataNodeProperties dn : dataNodes) {
+      DataNode datanode = dn.datanode;
+      Configuration dnConf = datanode.getConf();
+      setNamenodeList(dnConf);
+      datanode.refreshNamenodes(dnConf);
+    }
+
+    // Wait for new namenode to get registrations from all the datanodes
+    waitActive(nnIndex);
+    return nameNodes[nnIndex].nameNode;
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Tue Mar  1 22:43:39 2011
@@ -488,7 +488,7 @@ public class TestDFSClientRetries extend
     
     public void run() {
       try {
-        fs = cluster.getNewFileSystemInstance();
+        fs = cluster.getNewFileSystemInstance(0);
         
         int bufferSize = len;
         byte[] buf = new byte[bufferSize];

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java Tue Mar  1 22:43:39 2011
@@ -72,7 +72,7 @@ public class TestDatanodeConfig {
   @Test
   public void testDataDirectories() throws IOException {
     File dataDir = new File(BASE_DIR, "data").getCanonicalFile();
-    Configuration conf = cluster.getConfiguration();
+    Configuration conf = cluster.getConfiguration(0);
     // 1. Test unsupported schema. Only "file:" is supported.
     String dnDir = makeURI("shv", null, fileAsURI(dataDir).getPath());
     conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dnDir);

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Tue Mar  1 22:43:39 2011
@@ -354,7 +354,7 @@ public class TestDistributedFileSystem {
         hdfs.setPermission(new Path(dir), new FsPermission((short)0));
         try {
           final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
-          final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, "somegroup");
+          final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, 0, "somegroup");
           hftp2.getFileChecksum(qualified);
           fail();
         } catch(IOException ioe) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java Tue Mar  1 22:43:39 2011
@@ -73,8 +73,8 @@ public class TestFileStatus {
     conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
     cluster = new MiniDFSCluster.Builder(conf).build();
     fs = cluster.getFileSystem();
-    fc = FileContext.getFileContext(cluster.getURI(), conf);
-    hftpfs = cluster.getHftpFileSystem();
+    fc = FileContext.getFileContext(cluster.getURI(0), conf);
+    hftpfs = cluster.getHftpFileSystem(0);
     dfsClient = new DFSClient(NameNode.getAddress(conf), conf);
     file1 = new Path("filestatus.dat");
     writeFile(fs, file1, 1, fileSize, blockSize);
@@ -294,7 +294,7 @@ public class TestFileStatus {
       fs.setPermission(dir, new FsPermission((short)0));
       try {
         final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
-        final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, "somegroup");
+        final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, 0, "somegroup");
         hftp2.getContentSummary(dir);
         fail();
       } catch(IOException ioe) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListFilesInFileContext.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListFilesInFileContext.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListFilesInFileContext.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListFilesInFileContext.java Tue Mar  1 22:43:39 2011
@@ -64,7 +64,7 @@ public class TestListFilesInFileContext 
   @BeforeClass
   public static void testSetUp() throws Exception {
     cluster = new MiniDFSCluster.Builder(conf).build();
-    fc = FileContext.getFileContext(cluster.getConfiguration());
+    fc = FileContext.getFileContext(cluster.getConfiguration(0));
     fc.delete(TEST_DIR, true);
   }
   

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java Tue Mar  1 22:43:39 2011
@@ -62,7 +62,7 @@ public class TestListPathServlet {
     final String str = "hftp://"
         + CONF.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
     hftpURI = new URI(str);
-    hftpFs = cluster.getHftpFileSystem();
+    hftpFs = cluster.getHftpFileSystem(0);
   }
 
   @AfterClass
@@ -104,7 +104,7 @@ public class TestListPathServlet {
     checkStatus("/nonexistent/a");
 
     final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
-    final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, CONF, "somegroup");
+    final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, CONF, 0, "somegroup");
     { //test file not found on hftp 
       final Path nonexistent = new Path("/nonexistent");
       try {

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java Tue Mar  1 22:43:39 2011
@@ -18,120 +18,30 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
-
-import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.Assert;
+import static org.junit.Assert.*;
 import org.junit.Before;
 import org.junit.Test;
 
 public class TestDataNodeMultipleRegistrations {
   private static final Log LOG = 
     LogFactory.getLog(TestDataNodeMultipleRegistrations.class);
-  File common_base_dir;
-  String localHost;
   Configuration conf;
 
   @Before
   public void setUp() throws Exception {
-    common_base_dir = new File(MiniDFSCluster.getBaseDirectory());
-    if (common_base_dir != null) {
-      if (common_base_dir.exists() && !FileUtil.fullyDelete(common_base_dir)) {
-        throw new IOException("cannot get directory ready:"
-            + common_base_dir.getAbsolutePath());
-      }
-    }
-
     conf = new HdfsConfiguration();
-    localHost = DNS.getDefaultHost(conf.get("dfs.datanode.dns.interface",
-        "default"), conf.get("dfs.datanode.dns.nameserver", "default"));
-
-    localHost = "127.0.0.1";
-    conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, localHost);
-  }
-
-  NameNode startNameNode(Configuration conf, int nnPort) throws IOException {
-    // per nn base_dir
-    File base_dir = new File(common_base_dir, Integer.toString(nnPort));
-
-    boolean manageNameDfsDirs = true; // for now
-    boolean format = true; // for now
-    // disable service authorization
-    conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
-        false);
-
-    // Setup the NameNode configuration
-    if (manageNameDfsDirs) {
-      String name = fileAsURI(new File(base_dir, "name1")) + ","
-          + fileAsURI(new File(base_dir, "name2"));
-      conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, name);
-      String sname = fileAsURI(new File(base_dir, "namesecondary1")) + ","
-          + fileAsURI(new File(base_dir, "namesecondary2"));
-      conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, sname);
-    }
-
-    // Format and clean out DataNode directories
-    if (format) {
-      GenericTestUtils.formatNamenode(conf);
-    }
-
-    // Start the NameNode
-    String[] args = new String[] {};
-    return NameNode.createNameNode(args, conf);
-  }
-
-  public DataNode startDataNode(Configuration conf) throws IOException {
-    Configuration dnConf = new HdfsConfiguration(conf);
-    boolean manageDfsDirs = true; // for now
-    File data_dir = new File(common_base_dir, "data");
-    if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
-      throw new IOException("Cannot remove data directory: " + data_dir);
-    }
-
-    if (manageDfsDirs) {
-      File dir1 = new File(data_dir, "data1");
-      File dir2 = new File(data_dir, "data2");
-      dir1.mkdirs();
-      dir2.mkdirs();
-      if (!dir1.isDirectory() || !dir2.isDirectory()) {
-        throw new IOException(
-            "Mkdirs failed to create directory for DataNode: " + dir1 + " or "
-                + dir2);
-      }
-      String dirs = fileAsURI(dir1) + "," + fileAsURI(dir2);
-      dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
-      conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
-    }
-    LOG.debug("Starting DataNode " + " with "
-        + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
-        + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
-
-    String[] dnArgs = null; // for now
-    DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
-    if (dn == null)
-      throw new IOException("Cannot start DataNode in "
-          + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
-
-    dn.runDatanodeDaemon();
-    return dn;
   }
 
   /**
@@ -142,89 +52,70 @@ public class TestDataNodeMultipleRegistr
    */
   @Test
   public void test2NNRegistration() throws IOException {
-    NameNode nn1, nn2;
-    // figure out host name for DataNode
-    int nnPort = 9928;
-    String nnURL1 = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
-    FileSystem.setDefaultUri(conf, nnURL1);
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":50070");
-    nn1 = startNameNode(conf, nnPort);
-    
-    nnPort = 9929;
-    String nnURL2 = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
-    FileSystem.setDefaultUri(conf, nnURL2);
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":50071");
-    nn2 = startNameNode(conf, nnPort);
-    
-    Assert.assertNotNull("cannot create nn1", nn1);
-    Assert.assertNotNull("cannot create nn2", nn2);
-    
-    String bpid1 = nn1.getFSImage().getBlockPoolID();
-    String bpid2 = nn2.getFSImage().getBlockPoolID();
-    String cid1 = nn1.getFSImage().getClusterID();
-    String cid2 = nn2.getFSImage().getClusterID();
-    int lv1 = nn1.getFSImage().getLayoutVersion();
-    int lv2 = nn2.getFSImage().getLayoutVersion();
-    int ns1 = nn1.getFSImage().namespaceID;
-    int ns2 = nn2.getFSImage().namespaceID;
-    Assert.assertNotSame("namespace ids should be different", ns1, ns2);
-    LOG.info("nn1: lv=" + lv1 + ";cid=" + cid1 + ";bpid=" + bpid1
-        + ";uri=" + nn1.getNameNodeAddress());
-    LOG.info("nn2: lv=" + lv2 + ";cid=" + cid2 + ";bpid=" + bpid2
-        + ";uri=" + nn2.getNameNodeAddress());
-
-    // now start the datanode...
-    String nns = nnURL1 + "," + nnURL2;
-    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nns);
-    DataNode dn = startDataNode(conf);
-    Assert.assertNotNull("failed to create DataNode", dn);
-    waitDataNodeUp(dn);
-
-    
-    // check number of volumes in fsdataset
-    Collection<VolumeInfo> volInfos = ((FSDataset) dn.data).getVolumeInfo();
-    Assert.assertNotNull("No volumes in the fsdataset", volInfos);
-    int i=0;
-    for(VolumeInfo vi : volInfos) {
-      LOG.info("vol " + i++ + ";dir=" + vi.directory + ";fs= " + vi.freeSpace);
-    }
-    // number of volumes should be 2 - [data1, data2]
-    Assert.assertEquals("number of volumes is wrong",2, volInfos.size());
-    
-    
-    for (BPOfferService bpos : dn.getAllBpOs()) {
-      LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name
-          + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr);
-    }
-    
-    BPOfferService bpos1 = dn.getAllBpOs()[0];
-    BPOfferService bpos2 = dn.getAllBpOs()[1];
-    
-    //The order of bpos is not guaranteed, so fix the order
-    if (bpos1.nnAddr.equals(nn2.getNameNodeAddress())) {
-      BPOfferService tmp = bpos1;
-      bpos1 = bpos2;
-      bpos2 = tmp;
-    }
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numNameNodes(2)
+        .nameNodePort(9928).build();
+    try {
+      cluster.waitActive();
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      assertNotNull("cannot create nn1", nn1);
+      assertNotNull("cannot create nn2", nn2);
+
+      String bpid1 = nn1.getFSImage().getBlockPoolID();
+      String bpid2 = nn2.getFSImage().getBlockPoolID();
+      String cid1 = nn1.getFSImage().getClusterID();
+      String cid2 = nn2.getFSImage().getClusterID();
+      int lv1 = nn1.getFSImage().getLayoutVersion();
+      int lv2 = nn2.getFSImage().getLayoutVersion();
+      int ns1 = nn1.getFSImage().namespaceID;
+      int ns2 = nn2.getFSImage().namespaceID;
+      assertNotSame("namespace ids should be different", ns1, ns2);
+      LOG.info("nn1: lv=" + lv1 + ";cid=" + cid1 + ";bpid=" + bpid1 + ";uri="
+          + nn1.getNameNodeAddress());
+      LOG.info("nn2: lv=" + lv2 + ";cid=" + cid2 + ";bpid=" + bpid2 + ";uri="
+          + nn2.getNameNodeAddress());
+
+      // check number of volumes in fsdataset
+      DataNode dn = cluster.getDataNodes().get(0);
+      Collection<VolumeInfo> volInfos = ((FSDataset) dn.data).getVolumeInfo();
+      assertNotNull("No volumes in the fsdataset", volInfos);
+      int i = 0;
+      for (VolumeInfo vi : volInfos) {
+        LOG.info("vol " + i++ + ";dir=" + vi.directory + ";fs= " + vi.freeSpace);
+      }
+      // number of volumes should be 2 - [data1, data2]
+      assertEquals("number of volumes is wrong", 2, volInfos.size());
+
+      for (BPOfferService bpos : dn.getAllBpOs()) {
+        LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid="
+            + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr);
+      }
+
+      BPOfferService bpos1 = dn.getAllBpOs()[0];
+      BPOfferService bpos2 = dn.getAllBpOs()[1];
 
-    Assert.assertEquals("wrong nn address", bpos1.nnAddr, nn1
-        .getNameNodeAddress());
-    Assert.assertEquals("wrong nn address", bpos2.nnAddr, nn2
-        .getNameNodeAddress());
-    Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
-    Assert.assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2);
-    Assert.assertEquals("wrong cid", dn.getClusterId(), cid1);
-    Assert.assertEquals("cid should be same", cid2, cid1);
-    Assert.assertEquals("namespace should be same", bpos1.bpNSInfo.namespaceID,
-        ns1);
-    Assert.assertEquals("namespace should be same", bpos2.bpNSInfo.namespaceID,
-        ns2);
-
-    dn.shutdown();
-    shutdownNN(nn1);
-    nn1 = null;
-    shutdownNN(nn2);
-    nn2 = null;
+      // The order of bpos is not guaranteed, so fix the order
+      if (bpos1.nnAddr.equals(nn2.getNameNodeAddress())) {
+        BPOfferService tmp = bpos1;
+        bpos1 = bpos2;
+        bpos2 = tmp;
+      }
+
+      assertEquals("wrong nn address", bpos1.nnAddr,
+          nn1.getNameNodeAddress());
+      assertEquals("wrong nn address", bpos2.nnAddr,
+          nn2.getNameNodeAddress());
+      assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
+      assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2);
+      assertEquals("wrong cid", dn.getClusterId(), cid1);
+      assertEquals("cid should be same", cid2, cid1);
+      assertEquals("namespace should be same",
+          bpos1.bpNSInfo.namespaceID, ns1);
+      assertEquals("namespace should be same",
+          bpos2.bpNSInfo.namespaceID, ns2);
+    } finally {
+      cluster.shutdown();
+    }
   }
 
   /**
@@ -234,86 +125,53 @@ public class TestDataNodeMultipleRegistr
    */
   @Test
   public void testFedSingleNN() throws IOException {
-    NameNode nn1;
-    int nnPort = 9927;
-    // figure out host name for DataNode
-    String nnURL = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
-
-    FileSystem.setDefaultUri(conf, nnURL);
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":50070");
-    nn1 = startNameNode(conf, nnPort);
-    Assert.assertNotNull("cannot create nn1", nn1);
-
-    String bpid1 = nn1.getFSImage().getBlockPoolID();
-    String cid1 = nn1.getFSImage().getClusterID();
-    int lv1 = nn1.getFSImage().getLayoutVersion();
-    LOG.info("nn1: lv=" + lv1 + ";cid=" + cid1 + ";bpid=" + bpid1
-        + ";uri=" + nn1.getNameNodeAddress());
-
-    // now start the datanode...
-    String nns = nnURL;
-    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nns);
-
-    DataNode dn = startDataNode(conf);
-    Assert.assertNotNull("failed to create DataNode", dn);
-
-    waitDataNodeUp(dn);
-    // check number of vlumes in fsdataset
-    Collection<VolumeInfo> volInfos = ((FSDataset) dn.data).getVolumeInfo();
-    Assert.assertNotNull("No volumes in the fsdataset", volInfos);
-    int i=0;
-    for(VolumeInfo vi : volInfos) {
-      LOG.info("vol " + i++ + ";dir=" + vi.directory + ";fs= " + vi.freeSpace);
-    }
-    // number of volumes should be 2 - [data1, data2]
-    Assert.assertEquals("number of volumes is wrong",2, volInfos.size());
-    
-
-    for (BPOfferService bpos : dn.getAllBpOs()) {
-      LOG.debug("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid="
-          + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr);
-    }
-
-    // try block report
-    BPOfferService bpos1 = dn.getAllBpOs()[0];
-    bpos1.lastBlockReport = 0;
-    DatanodeCommand cmd = bpos1.blockReport();
-
-    Assert.assertNotNull("cmd is null", cmd);
-
-    Assert.assertEquals("wrong nn address", bpos1.nnAddr, nn1
-        .getNameNodeAddress());
-    Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
-    Assert.assertEquals("wrong cid", dn.getClusterId(), cid1);
-
-    dn.shutdown();
-    dn = null;
-    shutdownNN(nn1);
-    nn1 = null;
-  }
-
-  void shutdownNN(NameNode nn) {
-    if (nn == null) {
-      return;
-    }
-    nn.stop();
-    nn.join();
-  }
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nameNodePort(9927).build();
+    try {
+      NameNode nn1 = cluster.getNameNode();
+      assertNotNull("cannot create nn1", nn1);
+
+      String bpid1 = nn1.getFSImage().getBlockPoolID();
+      String cid1 = nn1.getFSImage().getClusterID();
+      int lv1 = nn1.getFSImage().getLayoutVersion();
+      LOG.info("nn1: lv=" + lv1 + ";cid=" + cid1 + ";bpid=" + bpid1 + ";uri="
+          + nn1.getNameNodeAddress());
+
+      // check number of vlumes in fsdataset
+      DataNode dn = cluster.getDataNodes().get(0);
+      Collection<VolumeInfo> volInfos = ((FSDataset) dn.data).getVolumeInfo();
+      assertNotNull("No volumes in the fsdataset", volInfos);
+      int i = 0;
+      for (VolumeInfo vi : volInfos) {
+        LOG.info("vol " + i++ + ";dir=" + vi.directory + ";fs= " + vi.freeSpace);
+      }
+      // number of volumes should be 2 - [data1, data2]
+      assertEquals("number of volumes is wrong", 2, volInfos.size());
 
-  public boolean isDnUp(DataNode dn) {
-    boolean up = dn.getAllBpOs().length > 0;
-    for (BPOfferService bpos : dn.getAllBpOs()) {
-      up = up && bpos.initialized();
-    }
-    return up;
-  }
+      for (BPOfferService bpos : dn.getAllBpOs()) {
+        LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid="
+            + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr);
+      }
 
-  public void waitDataNodeUp(DataNode dn) {
-    // should be something smart
-    while (!isDnUp(dn)) {
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException ie) {
+      // try block report
+      BPOfferService bpos1 = dn.getAllBpOs()[0];
+      bpos1.lastBlockReport = 0;
+      DatanodeCommand cmd = bpos1.blockReport();
+
+      assertNotNull("cmd is null", cmd);
+
+      assertEquals("wrong nn address", bpos1.nnAddr,
+          nn1.getNameNodeAddress());
+      assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
+      assertEquals("wrong cid", dn.getClusterId(), cid1);
+      cluster.shutdown();
+      
+      // Ensure all the BPOfferService threads are shutdown
+      assertEquals(0, dn.getAllBpOs().length);
+      cluster = null;
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
       }
     }
   }

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java Tue Mar  1 22:43:39 2011
@@ -18,96 +18,62 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.junit.Before;
 import org.junit.Test;
 
-
+/**
+ * Tests datanode refresh namenode list functionality.
+ */
 public class TestRefreshNamenodes {
-  
-  private String localhost = "127.0.0.1";
   private int nnPort1 = 2221;
-  private int nnPort2 = 2222;
-  private int nnPort3 = 2223;
-  private int nnPort4 = 2224;
-  private final String nnURL1 = "hdfs://" + localhost + ":" + nnPort1;
-  private final String nnURL2 = "hdfs://" + localhost + ":" + nnPort2;
-  private final String nnURL3 = "hdfs://" + localhost + ":" + nnPort3;
-  private final String nnURL4 = "hdfs://" + localhost + ":" + nnPort4;
-  private NameNode nn1 = null;
-  private NameNode nn2 = null;
-  private NameNode nn3 = null;
-  private NameNode nn4 = null;
-  private TestDataNodeMultipleRegistrations tdnmr = null;
-  
-  @Before
-  public void setUp() throws Exception {
-    tdnmr = new TestDataNodeMultipleRegistrations();
-    tdnmr.setUp();
-  }
-  
-  private void startNamenodes() throws IOException {
-    Configuration conf = new HdfsConfiguration();
-    conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1:0");
-
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50071");
-    FileSystem.setDefaultUri(conf, nnURL1);
-    nn1 = tdnmr.startNameNode(conf, nnPort1);
-
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50072");
-    FileSystem.setDefaultUri(conf, nnURL2);
-    nn2 = tdnmr.startNameNode(conf, nnPort2);
-   
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50073");
-    FileSystem.setDefaultUri(conf, nnURL3);
-    nn3 = tdnmr.startNameNode(conf, nnPort3);
-    
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50074");
-    FileSystem.setDefaultUri(conf, nnURL4);
-    nn4 = tdnmr.startNameNode(conf, nnPort4);
-  }
+  private int nnPort2 = 2224;
+  private int nnPort3 = 2227;
+  private int nnPort4 = 2230;
 
   @Test
   public void testRefreshNamenodes() throws IOException {
+    // Start cluster with a single NN and DN
     Configuration conf = new Configuration();
-    
-    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nnURL1 +","+ nnURL2);   
-    startNamenodes();
-    
-    DataNode dn = tdnmr.startDataNode(conf);
-    tdnmr.waitDataNodeUp(dn);
-
-    assertEquals(2, dn.getAllBpOs().length);
-    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nnURL1 + "," + nnURL3
-        + "," + nnURL4);
-    dn.refreshNamenodes(conf);
-    tdnmr.waitDataNodeUp(dn);
-    BPOfferService[] bpoList = dn.getAllBpOs();
-    assertEquals(3, bpoList.length);
-
-    InetSocketAddress nn_addr_1 = bpoList[0].nnAddr;
-    InetSocketAddress nn_addr_2 = bpoList[1].nnAddr;
-    InetSocketAddress nn_addr_3 = bpoList[2].nnAddr;
-    
-    assertTrue(nn_addr_1.equals(nn1.getNameNodeAddress()));
-    assertTrue(nn_addr_2.equals(nn3.getNameNodeAddress()));
-    assertTrue(nn_addr_3.equals(nn4.getNameNodeAddress()));
-
-    dn.shutdown();
-    tdnmr.shutdownNN(nn1);
-    tdnmr.shutdownNN(nn2);
-    tdnmr.shutdownNN(nn3);
-    tdnmr.shutdownNN(nn4);
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numNameNodes(1)
+          .nameNodePort(nnPort1).build();
+
+      DataNode dn = cluster.getDataNodes().get(0);
+      assertEquals(1, dn.getAllBpOs().length);
+
+      cluster.addNameNode(conf, nnPort2);
+      assertEquals(2, dn.getAllBpOs().length);
+
+      cluster.addNameNode(conf, nnPort3);
+      assertEquals(3, dn.getAllBpOs().length);
+
+      cluster.addNameNode(conf, nnPort4);
+
+      BPOfferService[] bpoList = dn.getAllBpOs();
+      // Ensure a BPOfferService in the datanodes corresponds to
+      // a namenode in the cluster
+      for (int i = 0; i < 4; i++) {
+        InetSocketAddress addr = cluster.getNameNode(i).getNameNodeAddress();
+        boolean found = false;
+        for (int j = 0; j < bpoList.length; j++) {
+          if (bpoList[j] != null && addr.equals(bpoList[j].nnAddr)) {
+            found = true;
+            bpoList[j] = null; // Erase the address that matched
+            break;
+          }
+        }
+        assertTrue("NameNode address " + addr + " is not found.", found);
+      }
+    } finally {
+      cluster.shutdown();
+    }
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java Tue Mar  1 22:43:39 2011
@@ -415,7 +415,7 @@ public class TestBlockTokenWithDFS exten
       assertTrue(cluster.restartDataNodes(true));
       cluster.waitActive();
       assertEquals(numDataNodes, cluster.getDataNodes().size());
-      cluster.shutdownNameNode();
+      cluster.shutdownNameNode(0);
 
       // confirm tokens cached in in1 are still valid
       lblocks = DFSTestUtil.getAllBlocks(in1);
@@ -451,8 +451,8 @@ public class TestBlockTokenWithDFS exten
        */
 
       // restart the namenode and then shut it down for test
-      cluster.restartNameNode();
-      cluster.shutdownNameNode();
+      cluster.restartNameNode(0);
+      cluster.shutdownNameNode(0);
 
       // verify blockSeekTo() still works (forced to use cached tokens)
       in1.seek(0);
@@ -471,13 +471,13 @@ public class TestBlockTokenWithDFS exten
        */
 
       // restore the cluster and restart the datanodes for test
-      cluster.restartNameNode();
+      cluster.restartNameNode(0);
       assertTrue(cluster.restartDataNodes(true));
       cluster.waitActive();
       assertEquals(numDataNodes, cluster.getDataNodes().size());
 
       // shutdown namenode so that DFSClient can't get new tokens from namenode
-      cluster.shutdownNameNode();
+      cluster.shutdownNameNode(0);
 
       // verify blockSeekTo() fails (cached tokens become invalid)
       in1.seek(0);
@@ -486,7 +486,7 @@ public class TestBlockTokenWithDFS exten
       assertFalse(checkFile2(in3));
 
       // restart the namenode to allow DFSClient to re-fetch tokens
-      cluster.restartNameNode();
+      cluster.restartNameNode(0);
       // verify blockSeekTo() works again (by transparently re-fetching
       // tokens from namenode)
       in1.seek(0);

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java Tue Mar  1 22:43:39 2011
@@ -95,7 +95,7 @@ public class TestCheckPointForSecurityTo
       String[] args = new String[]{"-saveNamespace"};
 
       // verify that the edits file is NOT empty
-      Collection<URI> editsDirs = cluster.getNameEditsDirs();
+      Collection<URI> editsDirs = cluster.getNameEditsDirs(0);
       for(URI uri : editsDirs) {
         File ed = new File(uri.getPath());
         Assert.assertTrue(new File(ed, "current/edits").length() > Integer.SIZE/Byte.SIZE);

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Tue Mar  1 22:43:39 2011
@@ -655,7 +655,7 @@ public class TestCheckpoint extends Test
       //
       assertTrue(!fileSys.exists(file1));
       assertTrue(!fileSys.exists(file2));
-      namedirs = cluster.getNameDirs();
+      namedirs = cluster.getNameDirs(0);
 
       //
       // Create file1
@@ -757,7 +757,7 @@ public class TestCheckpoint extends Test
       writeFile(fs, file, replication);
       checkFile(fs, file, replication);
       // verify that the edits file is NOT empty
-      Collection<URI> editsDirs = cluster.getNameEditsDirs();
+      Collection<URI> editsDirs = cluster.getNameEditsDirs(0);
       for(URI uri : editsDirs) {
         File ed = new File(uri.getPath());
         assertTrue(new File(ed, "current/edits").length() > Integer.SIZE/Byte.SIZE);

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Tue Mar  1 22:43:39 2011
@@ -101,7 +101,7 @@ public class TestEditLog extends TestCas
       fileSys = cluster.getFileSystem();
       final FSNamesystem namesystem = cluster.getNamesystem();
   
-      for (Iterator<URI> it = cluster.getNameDirs().iterator(); it.hasNext(); ) {
+      for (Iterator<URI> it = cluster.getNameDirs(0).iterator(); it.hasNext(); ) {
         File dir = new File(it.next().getPath());
         System.out.println(dir);
       }

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java Tue Mar  1 22:43:39 2011
@@ -97,7 +97,7 @@ public class TestSecurityTokenEditLog ex
       fileSys = cluster.getFileSystem();
       final FSNamesystem namesystem = cluster.getNamesystem();
   
-      for (Iterator<URI> it = cluster.getNameDirs().iterator(); it.hasNext(); ) {
+      for (Iterator<URI> it = cluster.getNameDirs(0).iterator(); it.hasNext(); ) {
         File dir = new File(it.next().getPath());
         System.out.println(dir);
       }

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java?rev=1076040&r1=1076039&r2=1076040&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java Tue Mar  1 22:43:39 2011
@@ -128,7 +128,7 @@ public class TestOfflineImageViewer exte
       cluster.getNameNode().saveNamespace();
       
       // Determine location of fsimage file
-      URI [] files = cluster.getNameDirs().toArray(new URI[0]);
+      URI [] files = cluster.getNameDirs(0).toArray(new URI[0]);
       orig =  new File(files[0].getPath(), "current/fsimage");
       
       if(!orig.exists())



Mime
View raw message