Author: cutting
Date: Mon Mar 26 12:13:04 2007
New Revision: 522597
URL: http://svn.apache.org/viewvc?view=rev&rev=522597
Log:
HADOOP-1085. Improve port selection in test code. Contributed by Arun.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSMkdirs.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShellGenericOptions.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestGlobPaths.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Mar 26 12:13:04 2007
@@ -40,6 +40,11 @@
12. HADOOP-1131. Add a closeAll() static method to FileSystem.
(Philippe Gassmann via tomwhite)
+13. HADOOP-1085. Improve port selection in HDFS and MapReduce test
+ code. Ports are now selected by the OS during testing rather than
+ by probing for free ports, improving test reliability.
+ (Arun C Murthy via cutting)
+
Release 0.12.2 - 2007-23-17
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java Mon Mar 26 12:13:04 2007
@@ -218,7 +218,7 @@
MiniDFSCluster cluster = null;
try {
if(overrideFS == null) {
- cluster = new MiniDFSCluster(NAME_PORT, conf_, false);
+ cluster = new MiniDFSCluster(conf_, 1, true, null);
fs_ = cluster.getFileSystem();
} else {
System.out.println("overrideFS: " + overrideFS);
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Mon Mar 26 12:13:04 2007
@@ -52,16 +52,15 @@
{
try {
boolean mayExit = false;
- int jobTrackerPort = 60050;
MiniMRCluster mr = null;
MiniDFSCluster dfs = null;
FileSystem fileSys = null;
try{
Configuration conf = new Configuration();
- dfs = new MiniDFSCluster(8050, conf, false);
+ dfs = new MiniDFSCluster(conf, 1, true, null);
fileSys = dfs.getFileSystem();
String namenode = fileSys.getName();
- mr = new MiniMRCluster(jobTrackerPort, 60060, 1, namenode, true, 3);
+ mr = new MiniMRCluster(1, namenode, 3);
// During tests, the default Configuration will use a local mapred
// So don't specify -config or -cluster
String strJobtracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Mar 26 12:13:04 2007
@@ -274,6 +274,11 @@
this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
this.infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
this.infoServer.start();
+
+ // The web-server port can be ephemeral... ensure we have the correct info
+ this.infoPort = this.infoServer.getPort();
+ conf.set("dfs.info.port", this.infoPort);
+ LOG.info("Web-server up at: " + conf.get("dfs.info.port"));
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon Mar 26 12:13:04 2007
@@ -87,6 +87,8 @@
private Thread emptier;
private int handlerCount = 2;
+ private InetSocketAddress nameNodeAddress = null;
+
/** only used for testing purposes */
private boolean stopRequested = false;
@@ -175,19 +177,29 @@
*/
private void init(File[] dirs, String hostname, int port,
Configuration conf) throws IOException {
- this.namesystem = new FSNamesystem(dirs, hostname, port, this, conf);
this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
this.server = RPC.getServer(this, hostname, port, handlerCount,
false, conf);
this.server.start();
+ // The rpc-server port can be ephemeral... ensure we have the correct info
+ this.nameNodeAddress = this.server.getListenerAddress();
+ conf.set("fs.default.name", new String(nameNodeAddress.getHostName() + ":" + nameNodeAddress.getPort()));
+ LOG.info("Namenode up at: " + this.nameNodeAddress);
+
+ this.namesystem = new FSNamesystem(dirs, this.nameNodeAddress.getHostName(), this.nameNodeAddress.getPort(), this, conf);
+
this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
this.emptier.setDaemon(true);
this.emptier.start();
}
/**
- * Create a NameNode at the default location
+ * Create a NameNode at the default location.
+ *
+ * The conf will be modified to reflect the actual ports on which
+ * the NameNode is up and running if the user passes the port as
+ * <code>zero</code> in the conf.
*/
public NameNode(Configuration conf) throws IOException {
InetSocketAddress addr =
@@ -197,6 +209,10 @@
/**
* Create a NameNode at the specified location and start it.
+ *
+ * The conf will be modified to reflect the actual ports on which
+ * the NameNode is up and running if the user passes the port as
+ * <code>zero</code>.
*/
public NameNode(File[] dirs, String bindAddress, int port, Configuration conf) throws IOException {
init(dirs, bindAddress, port, conf);
@@ -697,6 +713,14 @@
return namesystem.getFsEditName();
}
+ /**
+ * Returns the address on which the NameNodes is listening to.
+ * @return the address on which the NameNodes is listening to.
+ */
+ public InetSocketAddress getNameNodeAddress() {
+ return nameNodeAddress;
+ }
+
/**
*/
public static void main(String argv[]) throws Exception {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Mon Mar 26 12:13:04 2007
@@ -279,7 +279,8 @@
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
- public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) {
+ public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
+ throws IOException {
return getServer(instance, bindAddress, port, 1, false, conf);
}
@@ -287,7 +288,8 @@
* port and address. */
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
- final boolean verbose, Configuration conf) {
+ final boolean verbose, Configuration conf)
+ throws IOException {
return new Server(instance, conf, bindAddress,port, numHandlers, verbose);
}
@@ -303,7 +305,8 @@
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
*/
- public Server(Object instance, Configuration conf, String bindAddress, int port) {
+ public Server(Object instance, Configuration conf, String bindAddress, int port)
+ throws IOException {
this(instance, conf, bindAddress, port, 1, false);
}
@@ -316,7 +319,7 @@
* @param verbose whether each call should be logged
*/
public Server(Object instance, Configuration conf, String bindAddress, int port,
- int numHandlers, boolean verbose) {
+ int numHandlers, boolean verbose) throws IOException {
super(bindAddress, port, Invocation.class, numHandlers, conf);
this.instance = instance;
this.implementation = instance.getClass();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Mon Mar 26 12:13:04 2007
@@ -182,6 +182,7 @@
// Bind the server socket to the local host and port
acceptChannel.socket().bind(address, backlogLength);
+ port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
@@ -313,6 +314,10 @@
}
}
+ InetSocketAddress getAddress() {
+ return new InetSocketAddress(acceptChannel.socket().getInetAddress(), acceptChannel.socket().getLocalPort());
+ }
+
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
@@ -599,7 +604,8 @@
* the number of handler threads that will be used to process calls.
*
*/
- protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf) {
+ protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf)
+ throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
this.port = port;
@@ -611,6 +617,10 @@
this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
+
+ // Start the listener here and let it bind to the port
+ listener = new Listener();
+ this.port = listener.getAddress().getPort();
}
/** Sets the timeout used for network i/o. */
@@ -618,7 +628,6 @@
/** Starts the service. Must be called before any calls will be handled. */
public synchronized void start() throws IOException {
- listener = new Listener();
listener.start();
for (int i = 0; i < handlerCount; i++) {
@@ -645,6 +654,14 @@
}
}
+ /**
+ * Return the socket (ip+port) on which the RPC server is listening to.
+ * @return the socket (ip+port) on which the RPC server is listening to.
+ */
+ public synchronized InetSocketAddress getListenerAddress() {
+ return listener.getAddress();
+ }
+
/** Called for each call. */
public abstract Writable call(Writable param) throws IOException;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Mar 26 12:13:04 2007
@@ -84,6 +84,17 @@
private static JobTracker tracker = null;
private static boolean runTracker = true;
+
+ /**
+ * Start the JobTracker with given configuration.
+ *
+ * The conf will be modified to reflect the actual ports on which
+ * the JobTracker is up and running if the user passes the port as
+ * <code>zero</code>.
+ *
+ * @param conf configuration for the JobTracker.
+ * @throws IOException
+ */
public static void startTracker(Configuration conf) throws IOException {
if (tracker != null)
throw new IOException("JobTracker already running.");
@@ -630,6 +641,15 @@
this.initJobsThread = new Thread(this.initJobs, "initJobs");
this.initJobsThread.start();
expireLaunchingTaskThread.start();
+
+ // The rpc/web-server ports can be ephemeral ports...
+ // ... ensure we have the correct info
+ this.port = interTrackerServer.getListenerAddress().getPort();
+ this.conf.set("mapred.job.tracker", new String(this.localMachine + ":" + this.port));
+ LOG.info("JobTracker up at: " + this.port);
+ this.infoPort = this.infoServer.getPort();
+ this.conf.set("mapred.job.tracker.info.port", this.infoPort);
+ LOG.info("JobTracker webserver: " + this.infoServer.getPort());
}
public static InetSocketAddress getAddress(Configuration conf) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Mar 26 12:13:04 2007
@@ -306,6 +306,12 @@
}
}
+ // The rpc-server port can be ephemeral...
+ // ... ensure we have the correct info
+ this.taskReportPort = taskReportServer.getListenerAddress().getPort();
+ this.fConf.setInt("mapred.task.tracker.report.port", this.taskReportPort);
+ LOG.info("TaskTracker up at: " + this.taskReportPort);
+
this.taskTrackerName = "tracker_" +
localHostname + ":" + taskReportPort;
LOG.info("Starting tracker " + taskTrackerName);
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Mon Mar 26 12:13:04 2007
@@ -37,8 +37,6 @@
private Thread dataNodeThreads[];
private NameNodeRunner nameNode;
private DataNodeRunner dataNodes[];
- static public int MAX_RETRIES = 10;
- static public int MAX_RETRIES_PER_PORT = 10;
private int nameNodePort = 0;
private int nameNodeInfoPort = 0;
@@ -48,15 +46,33 @@
*/
class NameNodeRunner implements Runnable {
private NameNode node;
+ private volatile boolean isInitialized = false;
+ private boolean isCrashed = false;
private boolean isRunning = true;
+
+ public InetSocketAddress getAddress() {
+ return node.getNameNodeAddress();
+ }
+
+ synchronized public boolean isInitialized() {
+ return isInitialized;
+ }
+ synchronized public boolean isCrashed() {
+ return isCrashed;
+ }
+
public boolean isUp() {
if (node == null) {
return false;
}
try {
long[] sizes = node.getStats();
- return !node.isInSafeMode() && sizes[0] != 0;
+ boolean isUp = false;
+ synchronized (this) {
+ isUp = (isInitialized && !node.isInSafeMode() && sizes[0] != 0);
+ }
+ return isUp;
} catch (IOException ie) {
return false;
}
@@ -71,11 +87,15 @@
if( isRunning ) {
node = new NameNode(conf);
}
+ isInitialized = true;
}
} catch (Throwable e) {
node = null;
System.err.println("Name node crashed:");
e.printStackTrace();
+ synchronized (this) {
+ isCrashed = true;
+ }
}
}
@@ -155,18 +175,25 @@
}
}
+ public MiniDFSCluster(Configuration conf,
+ int nDatanodes,
+ boolean formatNamenode,
+ String[] racks) throws IOException {
+ this(0, conf, nDatanodes, false, formatNamenode, racks);
+ }
+
/**
* Create the config and start up the servers. If either the rpc or info port is already
* in use, we will try new ports.
* @param namenodePort suggestion for which rpc port to use. caller should use
* getNameNodePort() to get the actual port used.
* @param dataNodeFirst should the datanode be brought up before the namenode?
+ * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
*/
public MiniDFSCluster(int namenodePort,
Configuration conf,
boolean dataNodeFirst) throws IOException {
- this(namenodePort, conf, 1, dataNodeFirst, true,
- MAX_RETRIES, MAX_RETRIES_PER_PORT, null);
+ this(namenodePort, conf, 1, dataNodeFirst, true, null);
}
/**
@@ -174,13 +201,13 @@
* in use, we will try new ports.
* @param namenodePort suggestion for which rpc port to use. caller should use
* getNameNodePort() to get the actual port used.
+ * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
*/
public MiniDFSCluster(int namenodePort,
Configuration conf,
int numRetries,
int numRetriesPerPort) throws IOException {
- this(namenodePort, conf, 0, false, false,
- numRetries, numRetriesPerPort, null);
+ this(namenodePort, conf, 0, false, false, null);
}
/**
@@ -190,13 +217,13 @@
* getNameNodePort() to get the actual port used.
* @param nDatanodes Number of datanodes
* @param dataNodeFirst should the datanode be brought up before the namenode?
+ * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
*/
public MiniDFSCluster(int namenodePort,
Configuration conf,
int nDatanodes,
boolean dataNodeFirst) throws IOException {
- this(namenodePort, conf, nDatanodes, dataNodeFirst, true,
- MAX_RETRIES, MAX_RETRIES_PER_PORT, null);
+ this(namenodePort, conf, nDatanodes, dataNodeFirst, true, null);
}
/**
@@ -207,26 +234,16 @@
* @param nDatanodes Number of datanodes
* @param dataNodeFirst should the datanode be brought up before the namenode?
* @param formatNamenode should the namenode be formatted before starting up ?
+ * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
*/
public MiniDFSCluster(int namenodePort,
Configuration conf,
int nDatanodes,
boolean dataNodeFirst,
boolean formatNamenode ) throws IOException {
- this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode,
- MAX_RETRIES, MAX_RETRIES_PER_PORT, null);
+ this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode, null);
}
- public MiniDFSCluster(int namenodePort,
- Configuration conf,
- int nDatanodes,
- boolean dataNodeFirst,
- boolean formatNamenode,
- String[] racks) throws IOException {
- this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode,
- MAX_RETRIES, MAX_RETRIES_PER_PORT, racks);
- }
-
/**
* Create the config and start up the servers. If either the rpc or info port is already
* in use, we will try new ports.
@@ -242,92 +259,94 @@
int nDatanodes,
boolean dataNodeFirst,
boolean formatNamenode,
- int numRetries,
- int numRetriesPerPort,
String[] racks) throws IOException {
this.conf = conf;
-
+
this.nDatanodes = nDatanodes;
this.nameNodePort = namenodePort;
- this.nameNodeInfoPort = 50080; // We just want this port to be different from the default.
- File base_dir = new File(System.getProperty("test.build.data"),
- "dfs/");
+
+ this.conf.set("fs.default.name", "localhost:"+ Integer.toString(nameNodePort));
+ this.conf.setInt("dfs.info.port", nameNodeInfoPort);
+ this.conf.setInt("dfs.datanode.info.port", 0);
+
+ File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
File data_dir = new File(base_dir, "data");
- conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
+ this.conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
new File(base_dir, "name2").getPath());
- conf.setInt("dfs.replication", Math.min(3, nDatanodes));
- conf.setInt("dfs.safemode.extension", 0);
+ this.conf.setInt("dfs.replication", Math.min(3, nDatanodes));
+ this.conf.setInt("dfs.safemode.extension", 0);
- // Loops until we find ports that work or we give up because
- // too many tries have failed.
- boolean foundPorts = false;
- int portsTried = 0;
- while ((!foundPorts) && (portsTried < numRetries)) {
- conf.set("fs.default.name",
- "localhost:"+ Integer.toString(nameNodePort));
- conf.set("dfs.info.port", nameNodeInfoPort);
-
- if (formatNamenode) { NameNode.format(conf); }
- nameNode = new NameNodeRunner();
- nameNodeThread = new Thread(nameNode);
- dataNodes = new DataNodeRunner[nDatanodes];
- dataNodeThreads = new Thread[nDatanodes];
- for (int idx = 0; idx < nDatanodes; idx++) {
- if( racks == null || idx >= racks.length) {
- dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
- } else {
- dataNodes[idx] = new DataNodeRunner(conf, data_dir, racks[idx], idx);
- }
- dataNodeThreads[idx] = new Thread(dataNodes[idx]);
+ // Create the NameNode
+ if (formatNamenode) { NameNode.format(conf); }
+ nameNode = new NameNodeRunner();
+ nameNodeThread = new Thread(nameNode);
+
+ //
+ // Start the MiniDFSCluster
+ //
+
+ if (dataNodeFirst) {
+ startDataNodes(conf, racks, data_dir);
+ }
+
+ // Start the namenode and wait for it to be initialized
+ nameNodeThread.start();
+ while (!nameNode.isCrashed() && !nameNode.isInitialized()) {
+ try { // let daemons get started
+ System.err.println("Waiting for the NameNode to initialize...");
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {
}
- if (dataNodeFirst) {
- for (int idx = 0; idx < nDatanodes; idx++) {
- dataNodeThreads[idx].start();
- }
- nameNodeThread.start();
- } else {
- nameNodeThread.start();
- for (int idx = 0; idx < nDatanodes; idx++) {
- dataNodeThreads[idx].start();
- }
+ if (nameNode.isCrashed()) {
+ throw new RuntimeException("Namenode crashed");
}
-
- int retry = 0;
- while (!nameNode.isUp() && (retry < numRetriesPerPort)) {
- try { // let daemons get started
- System.out.println("waiting for dfs minicluster to start");
- Thread.sleep(1000);
- } catch(InterruptedException e) {
- }
- retry++;
+ }
+
+ // Set up the right ports for the datanodes
+ InetSocketAddress nnAddr = nameNode.getAddress();
+ nameNodePort = nnAddr.getPort();
+ this.conf.set("fs.default.name", nnAddr.getHostName()+ ":" + Integer.toString(nameNodePort));
+
+ // Start the datanodes
+ if (!dataNodeFirst) {
+ startDataNodes(conf, racks, data_dir);
+ }
+
+ while (!nameNode.isCrashed() && !nameNode.isUp()) {
+ try { // let daemons get started
+ System.err.println("Waiting for the Mini HDFS Cluster to start...");
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {
}
- if (retry >= numRetriesPerPort) {
- this.nameNodePort += 3;
- this.nameNodeInfoPort += 7;
- System.out.println("Failed to start DFS minicluster in " + retry + " attempts. Trying new ports:");
- System.out.println("\tNameNode RPC port: " + nameNodePort);
- System.out.println("\tNameNode info port: " + nameNodeInfoPort);
+ }
+
+ if (nameNode.isCrashed()) {
+ throw new RuntimeException("Namenode crashed");
+ }
+ }
- shutdown();
-
+ private void startDataNodes(Configuration conf, String[] racks, File data_dir) {
+ // Create the DataNodes & start them
+ dataNodes = new DataNodeRunner[nDatanodes];
+ dataNodeThreads = new Thread[nDatanodes];
+ for (int idx = 0; idx < nDatanodes; idx++) {
+ if( racks == null || idx >= racks.length) {
+ dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
} else {
- foundPorts = true;
+ dataNodes[idx] = new DataNodeRunner(conf, data_dir, racks[idx], idx);
}
- portsTried++;
- }
- System.out.println("\tNameNode portsTried " + portsTried);
- if (portsTried >= numRetries) {
- throw new IOException("Failed to start a DFS minicluster after trying " + portsTried + " ports.");
+ dataNodeThreads[idx] = new Thread(dataNodes[idx]);
+ dataNodeThreads[idx].start();
}
}
-
+
/**
* Returns the rpc port used by the mini cluster, because the caller supplied port is
* not necessarily the actual port used.
*/
public int getNameNodePort() {
- return nameNodePort;
+ return nameNode.getAddress().getPort();
}
/**
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java Mon Mar 26 12:13:04 2007
@@ -109,9 +109,9 @@
//
removeOneNameDir(namedirs);
try {
- cluster = new MiniDFSCluster(65312, conf, 1, 1);
+ cluster = new MiniDFSCluster(conf, 0, false, null);
assertTrue(false);
- } catch (IOException e) {
+ } catch (Throwable t) {
// no nothing
}
resurrectNameDir(namedirs); // put back namedir
@@ -124,8 +124,8 @@
throws IOException {
System.out.println("Starting testSecondaryNamenodeError 1");
Path file1 = new Path("checkpointxx.dat");
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes,
- false, false);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes,
+ false, null);
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
try {
@@ -161,8 +161,7 @@
// namenode restart accounted for the rolled edit logs.
//
System.out.println("Starting testSecondaryNamenodeError 2");
- cluster = new MiniDFSCluster(65312, conf, numDatanodes,
- false, false);
+ cluster = new MiniDFSCluster(conf, numDatanodes, false, null);
cluster.waitActive();
fileSys = cluster.getFileSystem();
try {
@@ -184,8 +183,8 @@
throws IOException {
System.out.println("Starting testSecondaryNamenodeError 21");
Path file1 = new Path("checkpointyy.dat");
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes,
- false, false);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes,
+ false, null);
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
try {
@@ -221,8 +220,7 @@
// namenode restart accounted for the rolled edit logs.
//
System.out.println("Starting testSecondaryNamenodeError 22");
- cluster = new MiniDFSCluster(65312, conf, numDatanodes,
- false, false);
+ cluster = new MiniDFSCluster(conf, numDatanodes, false, null);
cluster.waitActive();
fileSys = cluster.getFileSystem();
try {
@@ -246,7 +244,7 @@
File[] namedirs = null;
Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
@@ -278,7 +276,7 @@
//
// Restart cluster and verify that file1 still exist.
//
- cluster = new MiniDFSCluster(65312, conf, numDatanodes, false, false);
+ cluster = new MiniDFSCluster(conf, numDatanodes, false, null);
cluster.waitActive();
fileSys = cluster.getFileSystem();
try {
@@ -305,7 +303,7 @@
// Restart cluster and verify that file2 exists and
// file1 does not exist.
//
- cluster = new MiniDFSCluster(65312, conf, numDatanodes, false, false);
+ cluster = new MiniDFSCluster(conf, numDatanodes, false, null);
fileSys = cluster.getFileSystem();
assertTrue(!fileSys.exists(file1));
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSMkdirs.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSMkdirs.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSMkdirs.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSMkdirs.java Mon Mar 26 12:13:04 2007
@@ -43,7 +43,7 @@
*/
public void testDFSMkdirs() throws IOException {
Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, 2, false);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
FileSystem fileSys = cluster.getFileSystem();
try {
// First create a new directory with mkdirs
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java Mon Mar 26 12:13:04 2007
@@ -46,7 +46,7 @@
*/
public void testDFSShell() throws IOException {
Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, 2, false);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
FileSystem fs = cluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(),
fs instanceof DistributedFileSystem);
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShellGenericOptions.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShellGenericOptions.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShellGenericOptions.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShellGenericOptions.java Mon Mar 26 12:13:04 2007
@@ -38,7 +38,7 @@
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(65316, conf, true);
+ cluster = new MiniDFSCluster(conf, 1, true, null);
namenode = conf.get("fs.default.name", "local");
String [] args = new String[4];
args[2] = "-mkdir";
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Mon Mar 26 12:13:04 2007
@@ -249,7 +249,7 @@
conf.set("dfs.hosts.exclude", excludeFile.toString());
writeConfigFile(localFileSys, excludeFile, null);
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
// Now wait for 15 seconds to give datanodes chance to register
// themselves and to report heartbeat
try {
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java Mon Mar 26 12:13:04 2007
@@ -48,7 +48,7 @@
DFSTestUtil util = new DFSTestUtil("TestFileCorruption", 20, 3, 8*1024);
try {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(65314, conf, 3, false);
+ cluster = new MiniDFSCluster(conf, 3, true, null);
FileSystem fs = cluster.getFileSystem();
util.createFiles(fs, "/srcdat");
// Now deliberately remove the blocks
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java Mon Mar 26 12:13:04 2007
@@ -49,7 +49,7 @@
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(65314, conf, 4, false);
+ cluster = new MiniDFSCluster(conf, 4, true, null);
FileSystem fs = cluster.getFileSystem();
util.createFiles(fs, "/srcdat");
PrintStream oldOut = System.out;
@@ -73,7 +73,7 @@
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(65314, conf, 4, false);
+ cluster = new MiniDFSCluster(conf, 4, true, null);
FileSystem fs = cluster.getFileSystem();
util.createFiles(fs, "/srcdat");
PrintStream oldOut = System.out;
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java Mon Mar 26 12:13:04 2007
@@ -55,7 +55,7 @@
*/
public void testWorkingDirectory() throws IOException {
Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, false);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fileSys = cluster.getFileSystem();
try {
Path orig_path = fileSys.getWorkingDirectory();
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java Mon Mar 26 12:13:04 2007
@@ -111,7 +111,7 @@
*/
public void testPreadDFS() throws IOException {
Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, 3, false);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
FileSystem fileSys = cluster.getFileSystem();
try {
Path file1 = new Path("preadtest.dat");
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java Mon Mar 26 12:13:04 2007
@@ -131,7 +131,7 @@
public void testReplication() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean("dfs.replication.considerLoad", false);
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false, true, racks);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, racks);
cluster.waitActive();
InetSocketAddress addr = new InetSocketAddress("localhost",
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java Mon Mar 26 12:13:04 2007
@@ -48,11 +48,10 @@
/** check if DFS remains in proper condition after a restart */
public void testRestartDFS() throws Exception {
- String namenode = null;
MiniDFSCluster cluster = null;
DFSTestUtil files = new DFSTestUtil("TestRestartDFS", 20, 3, 8*1024);
try {
- cluster = new MiniDFSCluster(65314, conf, 4, false);
+ cluster = new MiniDFSCluster(conf, 4, true, null);
FileSystem fs = cluster.getFileSystem();
files.createFiles(fs, "/srcdat");
} finally {
@@ -60,7 +59,7 @@
}
try {
// Here we restart the MiniDFScluster without formatting namenode
- cluster = new MiniDFSCluster(65320, conf, 4, false, false);
+ cluster = new MiniDFSCluster(conf, 4, false, null);
FileSystem fs = cluster.getFileSystem();
assertTrue("Filesystem corrupted after restart.",
files.checkFiles(fs, "/srcdat"));
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java Mon Mar 26 12:13:04 2007
@@ -123,7 +123,7 @@
*/
public void testSeekBugDFS() throws IOException {
Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, false);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fileSys = cluster.getFileSystem();
try {
Path file1 = new Path("seektest.dat");
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java Mon Mar 26 12:13:04 2007
@@ -82,7 +82,7 @@
*/
public void testSmallBlock() throws IOException {
Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, 1, false);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fileSys = cluster.getFileSystem();
try {
Path file1 = new Path("smallblocktest.dat");
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Mon Mar 26 12:13:04 2007
@@ -179,7 +179,7 @@
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(65314, conf, 2, false);
+ cluster = new MiniDFSCluster(conf, 2, true, null);
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
MyFile[] files = createFiles(namenode, "/srcdat");
@@ -201,7 +201,7 @@
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(65316, conf, false);
+ cluster = new MiniDFSCluster(conf, 1, true, null);
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
@@ -223,7 +223,7 @@
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(65318, conf, false);
+ cluster = new MiniDFSCluster(conf, 1, true, null);
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
MyFile[] files = createFiles(namenode, "/srcdat");
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestGlobPaths.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestGlobPaths.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestGlobPaths.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestGlobPaths.java Mon Mar 26 12:13:04 2007
@@ -35,7 +35,7 @@
protected void setUp() throws Exception {
try {
Configuration conf = new Configuration();
- dfsCluster = new MiniDFSCluster(8889, conf, true);
+ dfsCluster = new MiniDFSCluster(conf, 1, true, null);
fs = FileSystem.get(conf);
} catch (IOException e) {
e.printStackTrace();
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java Mon Mar 26 12:13:04 2007
@@ -48,7 +48,8 @@
private static class TestServer extends Server {
private boolean sleep;
- public TestServer(String bindAddress, int port, int handlerCount, boolean sleep) {
+ public TestServer(String bindAddress, int port, int handlerCount, boolean sleep)
+ throws IOException {
super(bindAddress, port, LongWritable.class, handlerCount, conf);
this.setTimeout(1000);
this.sleep = sleep;
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Mon Mar 26 12:13:04 2007
@@ -35,28 +35,36 @@
private int jobTrackerInfoPort = 0;
private int numTaskTrackers;
- private List taskTrackerList = new ArrayList();
- private List taskTrackerThreadList = new ArrayList();
+ private List<TaskTrackerRunner> taskTrackerList = new ArrayList<TaskTrackerRunner>();
+ private List<Thread> taskTrackerThreadList = new ArrayList<Thread>();
private String namenode;
- private int MAX_RETRIES_PER_PORT = 10;
- private int MAX_RETRIES = 10;
-
/**
* An inner class that runs a job tracker.
*/
class JobTrackerRunner implements Runnable {
+ JobConf jc = null;
+
public boolean isUp() {
return (JobTracker.getTracker() != null);
}
+
+ public int getJobTrackerPort() {
+ return JobTracker.getAddress(jc).getPort();
+ }
+
+ public int getJobTrackerInfoPort() {
+ return jc.getInt("mapred.job.tracker.info.port", 50030);
+ }
+
/**
* Create the job tracker and run it.
*/
public void run() {
try {
- JobConf jc = createJobConf();
+ jc = createJobConf();
jc.set("mapred.local.dir","build/test/mapred/local");
JobTracker.startTracker(jc);
} catch (Throwable e) {
@@ -83,12 +91,14 @@
*/
class TaskTrackerRunner implements Runnable {
volatile TaskTracker tt;
+ int trackerId;
// the localDirs for this taskTracker
String[] localDir;
volatile boolean isInitialized = false;
volatile boolean isDead = false;
int numDir;
- TaskTrackerRunner(int numDir) {
+ TaskTrackerRunner(int trackerId, int numDir) {
+ this.trackerId = trackerId;
this.numDir = numDir;
// a maximum of 10 local dirs can be specified in MinMRCluster
localDir = new String[10];
@@ -100,11 +110,11 @@
public void run() {
try {
JobConf jc = createJobConf();
- jc.setInt("mapred.task.tracker.info.port", taskTrackerPort++);
- jc.setInt("mapred.task.tracker.report.port", taskTrackerPort++);
+ jc.setInt("mapred.task.tracker.info.port", 0);
+ jc.setInt("mapred.task.tracker.report.port", taskTrackerPort);
File localDir = new File(jc.get("mapred.local.dir"));
String mapredDir = "";
- File ttDir = new File(localDir, Integer.toString(taskTrackerPort) + "_" + 0);
+ File ttDir = new File(localDir, Integer.toString(trackerId) + "_" + 0);
if (!ttDir.mkdirs()) {
if (!ttDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + ttDir.toString());
@@ -113,7 +123,7 @@
this.localDir[0] = ttDir.getAbsolutePath();
mapredDir = ttDir.getAbsolutePath();
for (int i = 1; i < numDir; i++){
- ttDir = new File(localDir, Integer.toString(taskTrackerPort) + "_" + i);
+ ttDir = new File(localDir, Integer.toString(trackerId) + "_" + i);
ttDir.mkdirs();
if (!ttDir.mkdirs()) {
if (!ttDir.isDirectory()) {
@@ -217,11 +227,24 @@
JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL);
return result;
}
+
+ /**
+ * Create the config and the cluster.
+ * @param numTaskTrackers no. of tasktrackers in the cluster
+ * @param namenode the namenode
+ * @param numDir no. of directories
+ * @throws IOException
+ */
+ public MiniMRCluster(int numTaskTrackers, String namenode, int numDir)
+ throws IOException {
+ this(0, 0, numTaskTrackers, namenode, false, numDir);
+ }
/**
* Create the config and start up the servers. The ports supplied by the user are
* just used as suggestions. If those ports are already in use, new ports
* are tried. The caller should call getJobTrackerPort to get the actual rpc port used.
+ * @deprecated use {@link #MiniMRCluster(int, String, int)}
*/
public MiniMRCluster(int jobTrackerPort,
int taskTrackerPort,
@@ -231,71 +254,59 @@
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode,
taskTrackerFirst, 1);
}
-
+
public MiniMRCluster(int jobTrackerPort,
int taskTrackerPort,
int numTaskTrackers,
String namenode,
boolean taskTrackerFirst, int numDir) throws IOException {
-
+
this.jobTrackerPort = jobTrackerPort;
this.taskTrackerPort = taskTrackerPort;
- this.jobTrackerInfoPort = 50030;
+ this.jobTrackerInfoPort = 0;
this.numTaskTrackers = numTaskTrackers;
this.namenode = namenode;
- // Loop until we find a set of ports that are all unused or until we
- // give up because it's taken too many tries.
- boolean foundPorts = false;
- int portsTried = 0;
- while ((!foundPorts) && (portsTried < MAX_RETRIES)) {
- jobTracker = new JobTrackerRunner();
- jobTrackerThread = new Thread(jobTracker);
- if (!taskTrackerFirst) {
- jobTrackerThread.start();
- }
- for (int idx = 0; idx < numTaskTrackers; idx++) {
- TaskTrackerRunner taskTracker = new TaskTrackerRunner(numDir);
- Thread taskTrackerThread = new Thread(taskTracker);
+ // Create the JobTracker
+ jobTracker = new JobTrackerRunner();
+ jobTrackerThread = new Thread(jobTracker);
+
+ // Create the TaskTrackers
+ for (int idx = 0; idx < numTaskTrackers; idx++) {
+ TaskTrackerRunner taskTracker = new TaskTrackerRunner(idx, numDir);
+ Thread taskTrackerThread = new Thread(taskTracker);
+ taskTrackerList.add(taskTracker);
+ taskTrackerThreadList.add(taskTrackerThread);
+ }
+
+ // Start the MiniMRCluster
+
+ if (taskTrackerFirst) {
+ for (Thread taskTrackerThread : taskTrackerThreadList){
taskTrackerThread.start();
- taskTrackerList.add(taskTracker);
- taskTrackerThreadList.add(taskTrackerThread);
- }
- if (taskTrackerFirst) {
- jobTrackerThread.start();
- }
- int retry = 0;
- while (!jobTracker.isUp() && (retry < MAX_RETRIES_PER_PORT)) {
- try { // let daemons get started
- System.err.println("waiting for jobtracker to start");
- Thread.sleep(1000);
- } catch(InterruptedException e) {
- }
- retry++;
- }
- if (retry >= MAX_RETRIES_PER_PORT) {
- // Try new ports.
- this.jobTrackerPort += 7;
- this.jobTrackerInfoPort += 3;
- this.taskTrackerPort++;
-
- System.err.println("Failed to start MR minicluster in " + retry +
- " attempts. Retrying with new ports:");
- System.err.println("\tJobTracker RPC port = " + jobTrackerPort);
- System.err.println("\tJobTracker info port = " + jobTrackerInfoPort);
- System.err.println("\tTaskTracker RPC port(s) = " +
- taskTrackerPort + "-" + (taskTrackerPort+numTaskTrackers-1));
- shutdown();
- taskTrackerList.clear();
- } else {
- foundPorts = true;
}
- portsTried++;
}
- if (portsTried >= MAX_RETRIES) {
- throw new IOException("Failed to start MR minicluster after trying " + portsTried + " ports.");
+
+ jobTrackerThread.start();
+ while (!jobTracker.isUp()) {
+ try { // let daemons get started
+ System.err.println("Waiting for JobTracker to start...");
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {
+ }
}
+ // Set the configuration for the task-trackers
+ this.jobTrackerPort = jobTracker.getJobTrackerPort();
+ this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();
+
+ if (!taskTrackerFirst) {
+ for (Thread taskTrackerThread : taskTrackerThreadList){
+ taskTrackerThread.start();
+ }
+ }
+
+ // Wait till the MR cluster stabilizes
waitUntilIdle();
}
@@ -332,7 +343,7 @@
public static void main(String[] args) throws IOException {
System.out.println("Bringing up Jobtracker and tasktrackers.");
- MiniMRCluster mr = new MiniMRCluster(50000, 50002, 4, "local", false);
+ MiniMRCluster mr = new MiniMRCluster(4, "local", 1);
System.out.println("JobTracker and TaskTrackers are up.");
mr.shutdown();
System.out.println("JobTracker and TaskTrackers brought down.");
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java Mon Mar 26 12:13:04 2007
@@ -120,11 +120,10 @@
final int taskTrackers = 4;
final int jobTrackerPort = 60050;
Configuration conf = new Configuration();
- dfs = new MiniDFSCluster(65315, conf, true);
+ dfs = new MiniDFSCluster(conf, 1, true, null);
fileSys = dfs.getFileSystem();
namenode = fileSys.getName();
- mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers,
- namenode, true, 2);
+ mr = new MiniMRCluster(taskTrackers, namenode, 2);
final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
JobConf jobConf = new JobConf();
boolean result;
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java Mon Mar 26 12:13:04 2007
@@ -31,7 +31,7 @@
public void testBringUp() throws IOException {
MiniMRCluster mr = null;
try {
- mr = new MiniMRCluster(50000, 50010, 1, "local", false);
+ mr = new MiniMRCluster(1, "local", 1);
} finally {
if (mr != null) { mr.shutdown(); }
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Mon Mar 26 12:13:04 2007
@@ -157,11 +157,10 @@
final int jobTrackerPort = 60050;
Configuration conf = new Configuration();
- dfs = new MiniDFSCluster(65314, conf, true);
+ dfs = new MiniDFSCluster(conf, 1, true, null);
fileSys = dfs.getFileSystem();
namenode = fileSys.getName();
- mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers,
- namenode, true, 3);
+ mr = new MiniMRCluster(taskTrackers, namenode, 3);
JobConf jobConf = new JobConf();
String result;
final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
@@ -191,14 +190,12 @@
try {
final int taskTrackers = 4;
- final int jobTrackerPort = 60050;
Configuration conf = new Configuration();
- dfs = new MiniDFSCluster(65314, conf, true);
+ dfs = new MiniDFSCluster(conf, 1, true, null);
fileSys = dfs.getFileSystem();
namenode = fileSys.getName();
- mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, namenode,
- true, 3);
+ mr = new MiniMRCluster(taskTrackers, namenode, 3);
JobConf jobConf = new JobConf();
String result;
final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Mon Mar 26 12:13:04 2007
@@ -36,9 +36,9 @@
FileSystem fileSys = null;
try {
JobConf conf = new JobConf();
- dfs = new MiniDFSCluster(65314, conf, true);
+ dfs = new MiniDFSCluster(conf, 1, true, null);
fileSys = dfs.getFileSystem();
- mr = new MiniMRCluster(60050, 50060, 2, fileSys.getName(), true, 4);
+ mr = new MiniMRCluster(2, fileSys.getName(), 4);
// run the wordcount example with caching
boolean ret = MRCaching.launchMRCache("/testing/wc/input",
"/testing/wc/output",
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Mon Mar 26 12:13:04 2007
@@ -48,7 +48,7 @@
public void testWithLocal() throws IOException {
MiniMRCluster mr = null;
try {
- mr = new MiniMRCluster(60030, 60040, 2, "local", false, 3);
+ mr = new MiniMRCluster(2, "local", 3);
double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES,
mr.createJobConf());
double error = Math.abs(Math.PI - estimate);
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=522597&r1=522596&r2=522597
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Mar 26 12:13:04 2007
@@ -155,13 +155,11 @@
FileSystem fileSys = null;
try {
final int taskTrackers = 4;
- final int jobTrackerPort = 60050;
Configuration conf = new Configuration();
- dfs = new MiniDFSCluster(65314, conf, 4, true);
+ dfs = new MiniDFSCluster(conf, 4, true, null);
fileSys = dfs.getFileSystem();
- mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers,
- fileSys.getName(), true);
+ mr = new MiniMRCluster(taskTrackers, fileSys.getName(), 1);
double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES,
mr.createJobConf());
double error = Math.abs(Math.PI - estimate);
|