hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r522597 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/dfs/ src/test/or...
Date Mon, 26 Mar 2007 19:13:07 GMT
Author: cutting
Date: Mon Mar 26 12:13:04 2007
New Revision: 522597

URL: http://svn.apache.org/viewvc?view=rev&rev=522597
Log:
HADOOP-1085.  Improve port selection in test code.  Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSMkdirs.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShellGenericOptions.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestGlobPaths.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Mar 26 12:13:04 2007
@@ -40,6 +40,11 @@
 12. HADOOP-1131.  Add a closeAll() static method to FileSystem.
     (Philippe Gassmann via tomwhite)
 
+13. HADOOP-1085.  Improve port selection in HDFS and MapReduce test
+    code.  Ports are now selected by the OS during testing rather than
+    by probing for free ports, improving test reliability.
+    (Arun C Murthy via cutting)
+
 
 Release 0.12.2 - 2007-23-17
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java Mon Mar 26 12:13:04 2007
@@ -218,7 +218,7 @@
     MiniDFSCluster cluster = null;
     try {
       if(overrideFS == null) {
-        cluster = new MiniDFSCluster(NAME_PORT, conf_, false);
+        cluster = new MiniDFSCluster(conf_, 1, true, null);
         fs_ = cluster.getFileSystem();
       } else {
         System.out.println("overrideFS: " + overrideFS);

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Mon Mar 26 12:13:04 2007
@@ -52,16 +52,15 @@
   {
     try {
       boolean mayExit = false;
-      int jobTrackerPort = 60050;
       MiniMRCluster mr = null;
       MiniDFSCluster dfs = null; 
       FileSystem fileSys = null;
       try{
         Configuration conf = new Configuration();
-        dfs = new MiniDFSCluster(8050, conf, false);
+        dfs = new MiniDFSCluster(conf, 1, true, null);
         fileSys = dfs.getFileSystem();
         String namenode = fileSys.getName();
-        mr  = new MiniMRCluster(jobTrackerPort, 60060, 1, namenode, true, 3);
+        mr  = new MiniMRCluster(1, namenode, 3);
         // During tests, the default Configuration will use a local mapred
         // So don't specify -config or -cluster
         String strJobtracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Mar 26 12:13:04 2007
@@ -274,6 +274,11 @@
         this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
         this.infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
         this.infoServer.start();
+        
+        // The web-server port can be ephemeral... ensure we have the correct info
+        this.infoPort = this.infoServer.getPort();
+        conf.set("dfs.info.port", this.infoPort); 
+        LOG.info("Web-server up at: " + conf.get("dfs.info.port"));
     }
 
     /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon Mar 26 12:13:04 2007
@@ -87,6 +87,8 @@
     private Thread emptier;
     private int handlerCount = 2;
     
+    private InetSocketAddress nameNodeAddress = null;
+    
     /** only used for testing purposes  */
     private boolean stopRequested = false;
 
@@ -175,19 +177,29 @@
      */
     private void init(File[] dirs, String hostname, int port, 
                       Configuration conf) throws IOException {
-      this.namesystem = new FSNamesystem(dirs, hostname, port, this, conf);
       this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
       this.server = RPC.getServer(this, hostname, port, handlerCount, 
                                   false, conf);
       this.server.start();      
 
+      // The rpc-server port can be ephemeral... ensure we have the correct info
+      this.nameNodeAddress = this.server.getListenerAddress(); 
+      conf.set("fs.default.name", new String(nameNodeAddress.getHostName() + ":" + nameNodeAddress.getPort()));
+      LOG.info("Namenode up at: " + this.nameNodeAddress);
+
+      this.namesystem = new FSNamesystem(dirs, this.nameNodeAddress.getHostName(), this.nameNodeAddress.getPort(), this, conf);
+
       this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
       this.emptier.setDaemon(true);
       this.emptier.start();
     }
     
     /**
-     * Create a NameNode at the default location
+     * Create a NameNode at the default location.
+     * 
+     * The conf will be modified to reflect the actual ports on which 
+     * the NameNode is up and running if the user passes the port as
+     * <code>zero</code> in the conf.
      */
     public NameNode(Configuration conf) throws IOException {
       InetSocketAddress addr = 
@@ -197,6 +209,10 @@
 
     /**
      * Create a NameNode at the specified location and start it.
+     * 
+     * The conf will be modified to reflect the actual ports on which 
+     * the NameNode is up and running if the user passes the port as
+     * <code>zero</code>.  
      */
     public NameNode(File[] dirs, String bindAddress, int port, Configuration conf) throws IOException {
        init(dirs, bindAddress, port, conf);
@@ -697,6 +713,14 @@
       return namesystem.getFsEditName();
     }
 
+    /**
+     * Returns the address on which the NameNodes is listening to.
+     * @return the address on which the NameNodes is listening to.
+     */
+    public InetSocketAddress getNameNodeAddress() {
+      return nameNodeAddress;
+    }
+    
     /**
      */
     public static void main(String argv[]) throws Exception {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Mon Mar 26 12:13:04 2007
@@ -279,7 +279,8 @@
 
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
-  public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) {
+  public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) 
+  throws IOException {
     return getServer(instance, bindAddress, port, 1, false, conf);
   }
 
@@ -287,7 +288,8 @@
    * port and address. */
   public static Server getServer(final Object instance, final String bindAddress, final int port,
                                  final int numHandlers,
-                                 final boolean verbose, Configuration conf) {
+                                 final boolean verbose, Configuration conf) 
+  throws IOException {
     return new Server(instance, conf, bindAddress,port, numHandlers, verbose);
   }
 
@@ -303,7 +305,8 @@
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      */
-    public Server(Object instance, Configuration conf, String bindAddress, int port) {
+    public Server(Object instance, Configuration conf, String bindAddress, int port) 
+    throws IOException {
       this(instance, conf,  bindAddress, port, 1, false);
     }
 
@@ -316,7 +319,7 @@
      * @param verbose whether each call should be logged
      */
     public Server(Object instance, Configuration conf, String bindAddress,  int port,
-                  int numHandlers, boolean verbose) {
+                  int numHandlers, boolean verbose) throws IOException {
       super(bindAddress, port, Invocation.class, numHandlers, conf);
       this.instance = instance;
       this.implementation = instance.getClass();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Mon Mar 26 12:13:04 2007
@@ -182,6 +182,7 @@
 
       // Bind the server socket to the local host and port
       acceptChannel.socket().bind(address, backlogLength);
+      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
       // create a selector;
       selector= Selector.open();
 
@@ -313,6 +314,10 @@
       }
     }
 
+    InetSocketAddress getAddress() {
+      return new InetSocketAddress(acceptChannel.socket().getInetAddress(), acceptChannel.socket().getLocalPort());
+    }
+    
     void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
       Connection c = null;
       ServerSocketChannel server = (ServerSocketChannel) key.channel();
@@ -599,7 +604,8 @@
    * the number of handler threads that will be used to process calls.
    * 
    */
-  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf) {
+  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf) 
+  throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.port = port;
@@ -611,6 +617,10 @@
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
+    
+    // Start the listener here and let it bind to the port
+    listener = new Listener();
+    this.port = listener.getAddress().getPort();    
   }
 
   /** Sets the timeout used for network i/o. */
@@ -618,7 +628,6 @@
 
   /** Starts the service.  Must be called before any calls will be handled. */
   public synchronized void start() throws IOException {
-    listener = new Listener();
     listener.start();
     
     for (int i = 0; i < handlerCount; i++) {
@@ -645,6 +654,14 @@
     }
   }
 
+  /**
+   * Return the socket (ip+port) on which the RPC server is listening to.
+   * @return the socket (ip+port) on which the RPC server is listening to.
+   */
+  public synchronized InetSocketAddress getListenerAddress() {
+    return listener.getAddress();
+  }
+  
   /** Called for each call. */
   public abstract Writable call(Writable param) throws IOException;
   

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Mar 26 12:13:04 2007
@@ -84,6 +84,17 @@
 
     private static JobTracker tracker = null;
     private static boolean runTracker = true;
+    
+    /**
+     * Start the JobTracker with given configuration.
+     * 
+     * The conf will be modified to reflect the actual ports on which 
+     * the JobTracker is up and running if the user passes the port as
+     * <code>zero</code>.
+     *   
+     * @param conf configuration for the JobTracker.
+     * @throws IOException
+     */
     public static void startTracker(Configuration conf) throws IOException {
       if (tracker != null)
         throw new IOException("JobTracker already running.");
@@ -630,6 +641,15 @@
         this.initJobsThread = new Thread(this.initJobs, "initJobs");
         this.initJobsThread.start();
         expireLaunchingTaskThread.start();
+        
+        // The rpc/web-server ports can be ephemeral ports... 
+        // ... ensure we have the correct info
+        this.port = interTrackerServer.getListenerAddress().getPort();
+        this.conf.set("mapred.job.tracker", new String(this.localMachine + ":" + this.port));
+        LOG.info("JobTracker up at: " + this.port);
+        this.infoPort = this.infoServer.getPort();
+        this.conf.set("mapred.job.tracker.info.port", this.infoPort); 
+        LOG.info("JobTracker webserver: " + this.infoServer.getPort());
     }
 
     public static InetSocketAddress getAddress(Configuration conf) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Mar 26 12:13:04 2007
@@ -306,6 +306,12 @@
             }
         
         }
+        // The rpc-server port can be ephemeral... 
+        // ... ensure we have the correct info
+        this.taskReportPort = taskReportServer.getListenerAddress().getPort();
+        this.fConf.setInt("mapred.task.tracker.report.port", this.taskReportPort);
+        LOG.info("TaskTracker up at: " + this.taskReportPort);
+
         this.taskTrackerName = "tracker_" + 
                                localHostname + ":" + taskReportPort;
         LOG.info("Starting tracker " + taskTrackerName);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Mon Mar 26 12:13:04 2007
@@ -37,8 +37,6 @@
   private Thread dataNodeThreads[];
   private NameNodeRunner nameNode;
   private DataNodeRunner dataNodes[];
-  static public int MAX_RETRIES  = 10;
-  static public int MAX_RETRIES_PER_PORT = 10;
 
   private int nameNodePort = 0;
   private int nameNodeInfoPort = 0;
@@ -48,15 +46,33 @@
    */
   class NameNodeRunner implements Runnable {
     private NameNode node;
+    private volatile boolean isInitialized = false;
+    private boolean isCrashed = false;
     private boolean isRunning = true;
+
+    public InetSocketAddress getAddress() {
+      return node.getNameNodeAddress();
+    }
+    
+    synchronized public boolean isInitialized() {
+      return isInitialized;
+    }
     
+    synchronized public boolean isCrashed() {
+      return isCrashed;
+    }
+
     public boolean isUp() {
       if (node == null) {
         return false;
       }
       try {
         long[] sizes = node.getStats();
-        return !node.isInSafeMode() && sizes[0] != 0;
+        boolean isUp = false;
+        synchronized (this) {
+          isUp = (isInitialized && !node.isInSafeMode() && sizes[0] != 0);
+        }
+        return isUp;
       } catch (IOException ie) {
         return false;
       }
@@ -71,11 +87,15 @@
           if( isRunning ) {
             node = new NameNode(conf);
           }
+          isInitialized = true;
         }
       } catch (Throwable e) {
         node = null;
         System.err.println("Name node crashed:");
         e.printStackTrace();
+        synchronized (this) {
+          isCrashed = true;
+        }
       }
     }
     
@@ -155,18 +175,25 @@
     }
   }
 
+  public MiniDFSCluster(Configuration conf,
+          int nDatanodes,
+          boolean formatNamenode,
+          String[] racks) throws IOException {
+    this(0, conf, nDatanodes, false, formatNamenode, racks);
+  }
+  
   /**
    * Create the config and start up the servers.  If either the rpc or info port is already 
    * in use, we will try new ports.
    * @param namenodePort suggestion for which rpc port to use.  caller should use 
    *                     getNameNodePort() to get the actual port used.
    * @param dataNodeFirst should the datanode be brought up before the namenode?
+   * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
    */
   public MiniDFSCluster(int namenodePort, 
                         Configuration conf,
                         boolean dataNodeFirst) throws IOException {
-    this(namenodePort, conf, 1, dataNodeFirst, true, 
-         MAX_RETRIES, MAX_RETRIES_PER_PORT, null);
+    this(namenodePort, conf, 1, dataNodeFirst, true, null);
   }
 
   /**
@@ -174,13 +201,13 @@
    * in use, we will try new ports.
    * @param namenodePort suggestion for which rpc port to use.  caller should use 
    *                     getNameNodePort() to get the actual port used.
+   * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}                     
    */
   public MiniDFSCluster(int namenodePort, 
                         Configuration conf,
                         int numRetries,
                         int numRetriesPerPort) throws IOException {
-    this(namenodePort, conf, 0, false, false, 
-         numRetries, numRetriesPerPort, null);
+    this(namenodePort, conf, 0, false, false, null);  
   }
 
   /**
@@ -190,13 +217,13 @@
    *                     getNameNodePort() to get the actual port used.
    * @param nDatanodes Number of datanodes   
    * @param dataNodeFirst should the datanode be brought up before the namenode?
+   * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
    */
   public MiniDFSCluster(int namenodePort, 
                         Configuration conf,
                         int nDatanodes,
                         boolean dataNodeFirst) throws IOException {
-    this(namenodePort, conf, nDatanodes, dataNodeFirst, true, 
-         MAX_RETRIES, MAX_RETRIES_PER_PORT, null);
+    this(namenodePort, conf, nDatanodes, dataNodeFirst, true, null);
   }
   
   /**
@@ -207,26 +234,16 @@
    * @param nDatanodes Number of datanodes   
    * @param dataNodeFirst should the datanode be brought up before the namenode?
    * @param formatNamenode should the namenode be formatted before starting up ?
+   * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
    */
   public MiniDFSCluster(int namenodePort, 
                         Configuration conf,
                         int nDatanodes,
                         boolean dataNodeFirst,
                         boolean formatNamenode ) throws IOException {
-    this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode, 
-         MAX_RETRIES, MAX_RETRIES_PER_PORT, null);
+    this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode, null);
   }
 
-  public MiniDFSCluster(int namenodePort, 
-                        Configuration conf,
-                        int nDatanodes,
-                        boolean dataNodeFirst,
-                        boolean formatNamenode,
-                        String[] racks) throws IOException {
-    this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode, 
-         MAX_RETRIES, MAX_RETRIES_PER_PORT, racks);
-  }
-  
   /**
    * Create the config and start up the servers.  If either the rpc or info port is already 
    * in use, we will try new ports.
@@ -242,92 +259,94 @@
                         int nDatanodes,
                         boolean dataNodeFirst,
                         boolean formatNamenode,
-                        int numRetries,
-                        int numRetriesPerPort,
                         String[] racks) throws IOException {
 
     this.conf = conf;
-
+    
     this.nDatanodes = nDatanodes;
     this.nameNodePort = namenodePort;
-    this.nameNodeInfoPort = 50080;   // We just want this port to be different from the default.
-    File base_dir = new File(System.getProperty("test.build.data"),
-                             "dfs/");
+
+    this.conf.set("fs.default.name", "localhost:"+ Integer.toString(nameNodePort));
+    this.conf.setInt("dfs.info.port", nameNodeInfoPort);
+    this.conf.setInt("dfs.datanode.info.port", 0);
+
+    File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
     File data_dir = new File(base_dir, "data");
-    conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
+    this.conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
         new File(base_dir, "name2").getPath());
-    conf.setInt("dfs.replication", Math.min(3, nDatanodes));
-    conf.setInt("dfs.safemode.extension", 0);
+    this.conf.setInt("dfs.replication", Math.min(3, nDatanodes));
+    this.conf.setInt("dfs.safemode.extension", 0);
 
-    // Loops until we find ports that work or we give up because 
-    // too many tries have failed.
-    boolean foundPorts = false;
-    int portsTried = 0;
-    while ((!foundPorts) && (portsTried < numRetries)) {
-      conf.set("fs.default.name", 
-               "localhost:"+ Integer.toString(nameNodePort));
-      conf.set("dfs.info.port", nameNodeInfoPort);
-      
-      if (formatNamenode) { NameNode.format(conf); }
-      nameNode = new NameNodeRunner();
-      nameNodeThread = new Thread(nameNode);
-      dataNodes = new DataNodeRunner[nDatanodes];
-      dataNodeThreads = new Thread[nDatanodes];
-      for (int idx = 0; idx < nDatanodes; idx++) {
-        if( racks == null || idx >= racks.length) {
-          dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
-        } else {
-          dataNodes[idx] = new DataNodeRunner(conf, data_dir, racks[idx], idx);          
-        }
-        dataNodeThreads[idx] = new Thread(dataNodes[idx]);
+    // Create the NameNode
+    if (formatNamenode) { NameNode.format(conf); }
+    nameNode = new NameNodeRunner();
+    nameNodeThread = new Thread(nameNode);
+
+    //
+    // Start the MiniDFSCluster
+    //
+    
+    if (dataNodeFirst) {
+      startDataNodes(conf, racks, data_dir);
+    }
+    
+    // Start the namenode and wait for it to be initialized
+    nameNodeThread.start();
+    while (!nameNode.isCrashed() && !nameNode.isInitialized()) {
+      try {                                     // let daemons get started
+        System.err.println("Waiting for the NameNode to initialize...");
+        Thread.sleep(1000);
+      } catch(InterruptedException e) {
       }
-      if (dataNodeFirst) {
-        for (int idx = 0; idx < nDatanodes; idx++) {
-          dataNodeThreads[idx].start();
-        }
-        nameNodeThread.start();      
-      } else {
-        nameNodeThread.start();
-        for (int idx = 0; idx < nDatanodes; idx++) {
-          dataNodeThreads[idx].start();
-        }
+      if (nameNode.isCrashed()) {
+        throw new RuntimeException("Namenode crashed");
       }
-
-      int retry = 0;
-      while (!nameNode.isUp() && (retry < numRetriesPerPort)) {
-        try {                                     // let daemons get started
-          System.out.println("waiting for dfs minicluster to start");
-          Thread.sleep(1000);
-        } catch(InterruptedException e) {
-        }
-        retry++;
+    }
+    
+    // Set up the right ports for the datanodes
+    InetSocketAddress nnAddr = nameNode.getAddress(); 
+    nameNodePort = nnAddr.getPort(); 
+    this.conf.set("fs.default.name", nnAddr.getHostName()+ ":" + Integer.toString(nameNodePort));
+    
+    // Start the datanodes
+    if (!dataNodeFirst) {
+      startDataNodes(conf, racks, data_dir);
+    }
+    
+    while (!nameNode.isCrashed() && !nameNode.isUp()) {
+      try {                                     // let daemons get started
+        System.err.println("Waiting for the Mini HDFS Cluster to start...");
+        Thread.sleep(1000);
+      } catch(InterruptedException e) {
       }
-      if (retry >= numRetriesPerPort) {
-        this.nameNodePort += 3;
-        this.nameNodeInfoPort += 7;
-        System.out.println("Failed to start DFS minicluster in " + retry + " attempts.  Trying new ports:");
-        System.out.println("\tNameNode RPC port: " + nameNodePort);
-        System.out.println("\tNameNode info port: " + nameNodeInfoPort);
+    }
+    
+    if (nameNode.isCrashed()) {
+      throw new RuntimeException("Namenode crashed");
+    }
+  }
 
-        shutdown();
-        
+  private void startDataNodes(Configuration conf, String[] racks, File data_dir) {
+    // Create the DataNodes & start them
+    dataNodes = new DataNodeRunner[nDatanodes];
+    dataNodeThreads = new Thread[nDatanodes];
+    for (int idx = 0; idx < nDatanodes; idx++) {
+      if( racks == null || idx >= racks.length) {
+        dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
       } else {
-        foundPorts = true;
+        dataNodes[idx] = new DataNodeRunner(conf, data_dir, racks[idx], idx);          
       }
-      portsTried++;
-    } 
-    System.out.println("\tNameNode portsTried " + portsTried);
-    if (portsTried >= numRetries) {
-        throw new IOException("Failed to start a DFS minicluster after trying " + portsTried + " ports.");
+      dataNodeThreads[idx] = new Thread(dataNodes[idx]);
+      dataNodeThreads[idx].start();
     }
   }
-
+  
   /**
    * Returns the rpc port used by the mini cluster, because the caller supplied port is 
    * not necessarily the actual port used.
    */     
   public int getNameNodePort() {
-    return nameNodePort;
+    return nameNode.getAddress().getPort();
   }
     
   /**

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java Mon Mar 26 12:13:04 2007
@@ -109,9 +109,9 @@
     //
     removeOneNameDir(namedirs);
     try {
-      cluster = new MiniDFSCluster(65312, conf, 1, 1);
+      cluster = new MiniDFSCluster(conf, 0, false, null);
       assertTrue(false);
-    } catch (IOException e) {
+    } catch (Throwable t) {
       // no nothing
     }
     resurrectNameDir(namedirs); // put back namedir
@@ -124,8 +124,8 @@
   throws IOException {
     System.out.println("Starting testSecondaryNamenodeError 1");
     Path file1 = new Path("checkpointxx.dat");
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, 
-                                                false, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, 
+                                                false, null);
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
     try {
@@ -161,8 +161,7 @@
     // namenode restart accounted for the rolled edit logs.
     //
     System.out.println("Starting testSecondaryNamenodeError 2");
-    cluster = new MiniDFSCluster(65312, conf, numDatanodes, 
-                                 false, false);
+    cluster = new MiniDFSCluster(conf, numDatanodes, false, null);
     cluster.waitActive();
     fileSys = cluster.getFileSystem();
     try {
@@ -184,8 +183,8 @@
   throws IOException {
     System.out.println("Starting testSecondaryNamenodeError 21");
     Path file1 = new Path("checkpointyy.dat");
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, 
-                                                false, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, 
+                                                false, null);
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
     try {
@@ -221,8 +220,7 @@
     // namenode restart accounted for the rolled edit logs.
     //
     System.out.println("Starting testSecondaryNamenodeError 22");
-    cluster = new MiniDFSCluster(65312, conf, numDatanodes, 
-                                 false, false);
+    cluster = new MiniDFSCluster(conf, numDatanodes, false, null);
     cluster.waitActive();
     fileSys = cluster.getFileSystem();
     try {
@@ -246,7 +244,7 @@
     File[] namedirs = null;
 
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
 
@@ -278,7 +276,7 @@
     //
     // Restart cluster and verify that file1 still exist.
     //
-    cluster = new MiniDFSCluster(65312, conf, numDatanodes, false, false);
+    cluster = new MiniDFSCluster(conf, numDatanodes, false, null);
     cluster.waitActive();
     fileSys = cluster.getFileSystem();
     try {
@@ -305,7 +303,7 @@
     // Restart cluster and verify that file2 exists and
     // file1 does not exist.
     //
-    cluster = new MiniDFSCluster(65312, conf, numDatanodes, false, false);
+    cluster = new MiniDFSCluster(conf, numDatanodes, false, null);
     fileSys = cluster.getFileSystem();
 
     assertTrue(!fileSys.exists(file1));

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSMkdirs.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSMkdirs.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSMkdirs.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSMkdirs.java Mon Mar 26 12:13:04 2007
@@ -43,7 +43,7 @@
    */
   public void testDFSMkdirs() throws IOException {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, 2, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     FileSystem fileSys = cluster.getFileSystem();
     try {
     	// First create a new directory with mkdirs

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java Mon Mar 26 12:13:04 2007
@@ -46,7 +46,7 @@
    */
   public void testDFSShell() throws IOException {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, 2, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
             fs instanceof DistributedFileSystem);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShellGenericOptions.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShellGenericOptions.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShellGenericOptions.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShellGenericOptions.java Mon Mar 26 12:13:04 2007
@@ -38,7 +38,7 @@
         MiniDFSCluster cluster = null;
         try {
           Configuration conf = new Configuration();
-          cluster = new MiniDFSCluster(65316, conf, true);
+          cluster = new MiniDFSCluster(conf, 1, true, null);
           namenode = conf.get("fs.default.name", "local");
           String [] args = new String[4];
           args[2] = "-mkdir";

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Mon Mar 26 12:13:04 2007
@@ -249,7 +249,7 @@
     conf.set("dfs.hosts.exclude", excludeFile.toString());
     writeConfigFile(localFileSys, excludeFile, null);
 
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
     // Now wait for 15 seconds to give datanodes chance to register
     // themselves and to report heartbeat
     try {

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java Mon Mar 26 12:13:04 2007
@@ -48,7 +48,7 @@
     DFSTestUtil util = new DFSTestUtil("TestFileCorruption", 20, 3, 8*1024);
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(65314, conf, 3, false);
+      cluster = new MiniDFSCluster(conf, 3, true, null);
       FileSystem fs = cluster.getFileSystem();
       util.createFiles(fs, "/srcdat");
       // Now deliberately remove the blocks

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java Mon Mar 26 12:13:04 2007
@@ -49,7 +49,7 @@
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(65314, conf, 4, false);
+      cluster = new MiniDFSCluster(conf, 4, true, null);
       FileSystem fs = cluster.getFileSystem();
       util.createFiles(fs, "/srcdat");
       PrintStream oldOut = System.out;
@@ -73,7 +73,7 @@
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(65314, conf, 4, false);
+      cluster = new MiniDFSCluster(conf, 4, true, null);
       FileSystem fs = cluster.getFileSystem();
       util.createFiles(fs, "/srcdat");
       PrintStream oldOut = System.out;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java Mon Mar 26 12:13:04 2007
@@ -55,7 +55,7 @@
    */
   public void testWorkingDirectory() throws IOException {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fileSys = cluster.getFileSystem();
     try {
       Path orig_path = fileSys.getWorkingDirectory();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java Mon Mar 26 12:13:04 2007
@@ -111,7 +111,7 @@
    */
   public void testPreadDFS() throws IOException {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, 3, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
     FileSystem fileSys = cluster.getFileSystem();
     try {
       Path file1 = new Path("preadtest.dat");

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java Mon Mar 26 12:13:04 2007
@@ -131,7 +131,7 @@
   public void testReplication() throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean("dfs.replication.considerLoad", false);
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false, true, racks);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, racks);
     cluster.waitActive();
     
     InetSocketAddress addr = new InetSocketAddress("localhost",

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java Mon Mar 26 12:13:04 2007
@@ -48,11 +48,10 @@
   
   /** check if DFS remains in proper condition after a restart */
   public void testRestartDFS() throws Exception {
-    String namenode = null;
     MiniDFSCluster cluster = null;
     DFSTestUtil files = new DFSTestUtil("TestRestartDFS", 20, 3, 8*1024);
     try {
-      cluster = new MiniDFSCluster(65314, conf, 4, false);
+      cluster = new MiniDFSCluster(conf, 4, true, null);
       FileSystem fs = cluster.getFileSystem();
       files.createFiles(fs, "/srcdat");
     } finally {
@@ -60,7 +59,7 @@
     }
     try {
       // Here we restart the MiniDFScluster without formatting namenode
-      cluster = new MiniDFSCluster(65320, conf, 4, false, false);
+      cluster = new MiniDFSCluster(conf, 4, false, null);
       FileSystem fs = cluster.getFileSystem();
       assertTrue("Filesystem corrupted after restart.",
             files.checkFiles(fs, "/srcdat"));

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java Mon Mar 26 12:13:04 2007
@@ -123,7 +123,7 @@
    */
   public void testSeekBugDFS() throws IOException {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fileSys = cluster.getFileSystem();
     try {
       Path file1 = new Path("seektest.dat");

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java Mon Mar 26 12:13:04 2007
@@ -82,7 +82,7 @@
    */
   public void testSmallBlock() throws IOException {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, 1, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fileSys = cluster.getFileSystem();
     try {
       Path file1 = new Path("smallblocktest.dat");

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Mon Mar 26 12:13:04 2007
@@ -179,7 +179,7 @@
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(65314, conf, 2, false);
+      cluster = new MiniDFSCluster(conf, 2, true, null);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
@@ -201,7 +201,7 @@
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(65316, conf, false);
+      cluster = new MiniDFSCluster(conf, 1, true, null);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
@@ -223,7 +223,7 @@
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(65318, conf, false);
+      cluster = new MiniDFSCluster(conf, 1, true, null);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestGlobPaths.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestGlobPaths.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestGlobPaths.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestGlobPaths.java Mon Mar 26 12:13:04 2007
@@ -35,7 +35,7 @@
   protected void setUp() throws Exception {
     try {
       Configuration conf = new Configuration();
-      dfsCluster = new MiniDFSCluster(8889, conf, true);
+      dfsCluster = new MiniDFSCluster(conf, 1, true, null);
       fs = FileSystem.get(conf);
     } catch (IOException e) {
       e.printStackTrace();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java Mon Mar 26 12:13:04 2007
@@ -48,7 +48,8 @@
   private static class TestServer extends Server {
     private boolean sleep;
 
-    public TestServer(String bindAddress, int port, int handlerCount, boolean sleep) {
+    public TestServer(String bindAddress, int port, int handlerCount, boolean sleep) 
+    throws IOException {
       super(bindAddress, port, LongWritable.class, handlerCount, conf);
       this.setTimeout(1000);
       this.sleep = sleep;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Mon Mar 26 12:13:04 2007
@@ -35,28 +35,36 @@
     private int jobTrackerInfoPort = 0;
     private int numTaskTrackers;
     
-    private List taskTrackerList = new ArrayList();
-    private List taskTrackerThreadList = new ArrayList();
+    private List<TaskTrackerRunner> taskTrackerList = new ArrayList<TaskTrackerRunner>();
+    private List<Thread> taskTrackerThreadList = new ArrayList<Thread>();
     
     private String namenode;
     
-    private int MAX_RETRIES_PER_PORT = 10;
-    private int MAX_RETRIES = 10;
-
     /**
      * An inner class that runs a job tracker.
      */
     class JobTrackerRunner implements Runnable {
 
+        JobConf jc = null;
+        
         public boolean isUp() {
             return (JobTracker.getTracker() != null);
         }
+        
+        public int getJobTrackerPort() {
+          return JobTracker.getAddress(jc).getPort();
+        }
+
+        public int getJobTrackerInfoPort() {
+          return jc.getInt("mapred.job.tracker.info.port", 50030);
+        }
+        
         /**
          * Create the job tracker and run it.
          */
         public void run() {
             try {
-                JobConf jc = createJobConf();
+                jc = createJobConf();
                 jc.set("mapred.local.dir","build/test/mapred/local");
                 JobTracker.startTracker(jc);
             } catch (Throwable e) {
@@ -83,12 +91,14 @@
      */
     class TaskTrackerRunner implements Runnable {
         volatile TaskTracker tt;
+        int trackerId;
         // the localDirs for this taskTracker
         String[] localDir;
         volatile boolean isInitialized = false;
         volatile boolean isDead = false;
         int numDir;       
-        TaskTrackerRunner(int numDir) {
+        TaskTrackerRunner(int trackerId, int numDir) {
+          this.trackerId = trackerId;
           this.numDir = numDir;
           // a maximum of 10 local dirs can be specified in MinMRCluster
           localDir = new String[10];
@@ -100,11 +110,11 @@
         public void run() {
             try {
                 JobConf jc = createJobConf();
-                jc.setInt("mapred.task.tracker.info.port", taskTrackerPort++);
-                jc.setInt("mapred.task.tracker.report.port", taskTrackerPort++);
+                jc.setInt("mapred.task.tracker.info.port", 0);
+                jc.setInt("mapred.task.tracker.report.port", taskTrackerPort);
                 File localDir = new File(jc.get("mapred.local.dir"));
                 String mapredDir = "";
-                File ttDir = new File(localDir, Integer.toString(taskTrackerPort) + "_" + 0);
+                File ttDir = new File(localDir, Integer.toString(trackerId) + "_" + 0);
                 if (!ttDir.mkdirs()) {
                   if (!ttDir.isDirectory()) {
                     throw new IOException("Mkdirs failed to create " + ttDir.toString());
@@ -113,7 +123,7 @@
                 this.localDir[0] = ttDir.getAbsolutePath();
                 mapredDir = ttDir.getAbsolutePath();
                 for (int i = 1; i < numDir; i++){
-                  ttDir = new File(localDir, Integer.toString(taskTrackerPort) + "_" + i);
+                  ttDir = new File(localDir, Integer.toString(trackerId) + "_" + i);
                   ttDir.mkdirs();
                   if (!ttDir.mkdirs()) {
                     if (!ttDir.isDirectory()) {
@@ -217,11 +227,24 @@
       JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL);
       return result;
     }
+
+    /**
+     * Create the config and the cluster.
+     * @param numTaskTrackers no. of tasktrackers in the cluster
+     * @param namenode the namenode
+     * @param numDir no. of directories
+     * @throws IOException
+     */
+    public MiniMRCluster(int numTaskTrackers, String namenode, int numDir) 
+    throws IOException {
+      this(0, 0, numTaskTrackers, namenode, false, numDir);
+    }
     
     /**
      * Create the config and start up the servers.  The ports supplied by the user are
      * just used as suggestions.  If those ports are already in use, new ports
      * are tried.  The caller should call getJobTrackerPort to get the actual rpc port used.
+     * @deprecated use {@link #MiniMRCluster(int, String, int)}
      */
     public MiniMRCluster(int jobTrackerPort,
                          int taskTrackerPort,
@@ -231,71 +254,59 @@
         this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
              taskTrackerFirst, 1);
     } 
-  
+
     public MiniMRCluster(int jobTrackerPort,
             int taskTrackerPort,
             int numTaskTrackers,
             String namenode,
             boolean taskTrackerFirst, int numDir) throws IOException {
-        
+
         this.jobTrackerPort = jobTrackerPort;
         this.taskTrackerPort = taskTrackerPort;
-        this.jobTrackerInfoPort = 50030;
+        this.jobTrackerInfoPort = 0;
         this.numTaskTrackers = numTaskTrackers;
         this.namenode = namenode;
 
-        // Loop until we find a set of ports that are all unused or until we
-        // give up because it's taken too many tries.
-        boolean foundPorts = false;
-        int portsTried = 0;
-        while ((!foundPorts) && (portsTried < MAX_RETRIES)) {
-          jobTracker = new JobTrackerRunner();
-          jobTrackerThread = new Thread(jobTracker);
-          if (!taskTrackerFirst) {
-            jobTrackerThread.start();
-          }
-          for (int idx = 0; idx < numTaskTrackers; idx++) {
-            TaskTrackerRunner taskTracker = new TaskTrackerRunner(numDir);
-            Thread taskTrackerThread = new Thread(taskTracker);
+        // Create the JobTracker
+        jobTracker = new JobTrackerRunner();
+        jobTrackerThread = new Thread(jobTracker);
+        
+        // Create the TaskTrackers
+        for (int idx = 0; idx < numTaskTrackers; idx++) {
+          TaskTrackerRunner taskTracker = new TaskTrackerRunner(idx, numDir);
+          Thread taskTrackerThread = new Thread(taskTracker);
+          taskTrackerList.add(taskTracker);
+          taskTrackerThreadList.add(taskTrackerThread);
+        }
+
+        // Start the MiniMRCluster
+        
+        if (taskTrackerFirst) {
+          for (Thread taskTrackerThread : taskTrackerThreadList){
             taskTrackerThread.start();
-            taskTrackerList.add(taskTracker);
-            taskTrackerThreadList.add(taskTrackerThread);
-          }
-          if (taskTrackerFirst) {
-            jobTrackerThread.start();
-          }
-          int retry = 0;
-          while (!jobTracker.isUp() && (retry < MAX_RETRIES_PER_PORT)) {
-            try {                                     // let daemons get started
-              System.err.println("waiting for jobtracker to start");
-              Thread.sleep(1000);
-            } catch(InterruptedException e) {
-            }
-            retry++;
-          }
-          if (retry >= MAX_RETRIES_PER_PORT) {
-              // Try new ports.
-              this.jobTrackerPort += 7;
-              this.jobTrackerInfoPort += 3;
-              this.taskTrackerPort++;
-
-              System.err.println("Failed to start MR minicluster in " + retry + 
-                                 " attempts.  Retrying with new ports:");
-              System.err.println("\tJobTracker RPC port = " + jobTrackerPort);
-              System.err.println("\tJobTracker info port = " + jobTrackerInfoPort);
-              System.err.println("\tTaskTracker RPC port(s) = " + 
-                                 taskTrackerPort + "-" + (taskTrackerPort+numTaskTrackers-1));
-              shutdown();
-              taskTrackerList.clear();
-          } else {
-            foundPorts = true;
           }
-          portsTried++;
         }
-        if (portsTried >= MAX_RETRIES) {
-            throw new IOException("Failed to start MR minicluster after trying " + portsTried + " ports.");
+        
+        jobTrackerThread.start();
+        while (!jobTracker.isUp()) {
+          try {                                     // let daemons get started
+            System.err.println("Waiting for JobTracker to start...");
+            Thread.sleep(1000);
+          } catch(InterruptedException e) {
+          }
         }
         
+        // Set the configuration for the task-trackers
+        this.jobTrackerPort = jobTracker.getJobTrackerPort();
+        this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();
+
+        if (!taskTrackerFirst) {
+          for (Thread taskTrackerThread : taskTrackerThreadList){
+            taskTrackerThread.start();
+          }
+        }
+
+        // Wait till the MR cluster stabilizes
         waitUntilIdle();
     }
     
@@ -332,7 +343,7 @@
     
     public static void main(String[] args) throws IOException {
         System.out.println("Bringing up Jobtracker and tasktrackers.");
-        MiniMRCluster mr = new MiniMRCluster(50000, 50002, 4, "local", false);
+        MiniMRCluster mr = new MiniMRCluster(4, "local", 1);
         System.out.println("JobTracker and TaskTrackers are up.");
         mr.shutdown();
         System.out.println("JobTracker and TaskTrackers brought down.");

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java Mon Mar 26 12:13:04 2007
@@ -120,11 +120,10 @@
           final int taskTrackers = 4;
           final int jobTrackerPort = 60050;
           Configuration conf = new Configuration();
-          dfs = new MiniDFSCluster(65315, conf, true);
+          dfs = new MiniDFSCluster(conf, 1, true, null);
           fileSys = dfs.getFileSystem();
           namenode = fileSys.getName();
-          mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, 
-                                 namenode, true, 2);
+          mr = new MiniMRCluster(taskTrackers, namenode, 2);
           final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
           JobConf jobConf = new JobConf();
           boolean result;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java Mon Mar 26 12:13:04 2007
@@ -31,7 +31,7 @@
     public void testBringUp() throws IOException {
       MiniMRCluster mr = null;
       try {
-          mr = new MiniMRCluster(50000, 50010, 1, "local", false);
+          mr = new MiniMRCluster(1, "local", 1);
       } finally {
           if (mr != null) { mr.shutdown(); }
       }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Mon Mar 26 12:13:04 2007
@@ -157,11 +157,10 @@
           final int jobTrackerPort = 60050;
 
           Configuration conf = new Configuration();
-          dfs = new MiniDFSCluster(65314, conf, true);
+          dfs = new MiniDFSCluster(conf, 1, true, null);
           fileSys = dfs.getFileSystem();
           namenode = fileSys.getName();
-          mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, 
-                                 namenode, true, 3);
+          mr = new MiniMRCluster(taskTrackers, namenode, 3);
           JobConf jobConf = new JobConf();
           String result;
           final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
@@ -191,14 +190,12 @@
     try {
       
       final int taskTrackers = 4;
-      final int jobTrackerPort = 60050;
 
       Configuration conf = new Configuration();
-      dfs = new MiniDFSCluster(65314, conf, true);
+      dfs = new MiniDFSCluster(conf, 1, true, null);
       fileSys = dfs.getFileSystem();
       namenode = fileSys.getName();
-      mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, namenode, 
-        true, 3);      
+      mr = new MiniMRCluster(taskTrackers, namenode, 3);      
       JobConf jobConf = new JobConf();
       String result;
       final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Mon Mar 26 12:13:04 2007
@@ -36,9 +36,9 @@
     FileSystem fileSys = null;
     try {
       JobConf conf = new JobConf();
-      dfs = new MiniDFSCluster(65314, conf, true);
+      dfs = new MiniDFSCluster(conf, 1, true, null);
       fileSys = dfs.getFileSystem();
-      mr = new MiniMRCluster(60050, 50060, 2, fileSys.getName(), true, 4);
+      mr = new MiniMRCluster(2, fileSys.getName(), 4);
       // run the wordcount example with caching
       boolean ret = MRCaching.launchMRCache("/testing/wc/input",
                                             "/testing/wc/output",

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Mon Mar 26 12:13:04 2007
@@ -48,7 +48,7 @@
   public void testWithLocal() throws IOException {
       MiniMRCluster mr = null;
       try {
-          mr = new MiniMRCluster(60030, 60040, 2, "local", false, 3);
+          mr = new MiniMRCluster(2, "local", 3);
           double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
                                                mr.createJobConf());
           double error = Math.abs(Math.PI - estimate);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Mar 26 12:13:04 2007
@@ -155,13 +155,11 @@
       FileSystem fileSys = null;
       try {
           final int taskTrackers = 4;
-          final int jobTrackerPort = 60050;
 
           Configuration conf = new Configuration();
-          dfs = new MiniDFSCluster(65314, conf, 4, true);
+          dfs = new MiniDFSCluster(conf, 4, true, null);
           fileSys = dfs.getFileSystem();
-          mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, 
-                                 fileSys.getName(), true);
+          mr = new MiniMRCluster(taskTrackers, fileSys.getName(), 1);
           double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
                                                mr.createJobConf());
           double error = Math.abs(Math.PI - estimate);



Mime
View raw message