Modified: hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Mar 19 12:15:51 2009
@@ -81,11 +81,12 @@
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.MemoryCalculatorPlugin;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.Service;
+import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -94,13 +95,19 @@
* in a networked environment. It contacts the JobTracker
* for Task assignments and reporting results.
*
+ *
+ * The TaskTracker has a complex lifecycle in that it
+ * can be "recycled"; after {@link #closeTaskTracker()} is called,
+ * it can be reset using {@link #initialize()}. This is
+ * within the {@link Service} lifecycle.
*******************************************************/
-public class TaskTracker
+public class TaskTracker extends Service
implements MRConstants, TaskUmbilicalProtocol, Runnable {
- static final long WAIT_FOR_DONE = 3 * 1000;
- private int httpPort;
+ /** time to wait for a finished task to be reported as done: {@value}*/
+ private static final long WAIT_FOR_DONE = 3 * 1000;
+ int httpPort;
- static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
+ enum State {NORMAL, STALE, INTERRUPTED, DENIED}
static{
Configuration.addDefaultResource("mapred-default.xml");
@@ -119,7 +126,10 @@
public static final Log ClientTraceLog =
LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
- volatile boolean running = true;
+ /**
+ * Flag used to synchronize running state across threads.
+ */
+ private volatile boolean running = false;
private LocalDirAllocator localDirAllocator;
String taskTrackerName;
@@ -134,7 +144,7 @@
// last heartbeat response recieved
short heartbeatResponseId = -1;
- /*
+ /**
* This is the last 'status' report sent by this tracker to the JobTracker.
*
* If the rpc call succeeds, this 'status' is cleared-out by this tracker;
@@ -144,14 +154,15 @@
*/
TaskTrackerStatus status = null;
- // The system-directory on HDFS where job files are stored
+ /** The system-directory on HDFS where job files are stored */
Path systemDirectory = null;
- // The filesystem where job files are stored
+ /** The filesystem where job files are stored */
FileSystem systemFS = null;
- private final HttpServer server;
+ private HttpServer server;
+ /** Flag used to synchronize startup across threads. */
volatile boolean shuttingDown = false;
Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
@@ -344,33 +355,7 @@
/**
* A daemon-thread that pulls tips off the list of things to cleanup.
*/
- private Thread taskCleanupThread =
- new Thread(new Runnable() {
- public void run() {
- while (true) {
- try {
- TaskTrackerAction action = tasksToCleanup.take();
- if (action instanceof KillJobAction) {
- purgeJob((KillJobAction) action);
- } else if (action instanceof KillTaskAction) {
- TaskInProgress tip;
- KillTaskAction killAction = (KillTaskAction) action;
- synchronized (TaskTracker.this) {
- tip = tasks.get(killAction.getTaskID());
- }
- LOG.info("Received KillTaskAction for task: " +
- killAction.getTaskID());
- purgeTask(tip, false);
- } else {
- LOG.error("Non-delete action given to cleanup thread: "
- + action);
- }
- } catch (Throwable except) {
- LOG.warn(StringUtils.stringifyException(except));
- }
- }
- }
- }, "taskCleanup");
+ private TaskCleanupThread taskCleanupThread;
private RunningJob addTaskToJob(JobID jobId,
TaskInProgress tip) {
@@ -481,11 +466,24 @@
}
/**
- * Do the real constructor work here. It's in a separate method
+ * Initialize the connection.
+ * This method will block until a job tracker is found
+ * It's in a separate method
* so we can call it again and "recycle" the object after calling
* close().
*/
synchronized void initialize() throws IOException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Initializing Task Tracker: " + this);
+ }
+ //check that the server is not already live.
+
+ //allow this operation in only two service states: started and live
+ verifyServiceState(ServiceState.STARTED, ServiceState.LIVE);
+
+ //flip the running switch for our inner threads
+ running = true;
+
// use configured nameserver & interface to get local hostname
if (fConf.get("slave.host.name") != null) {
this.localHostname = fConf.get("slave.host.name");
@@ -572,10 +570,17 @@
DistributedCache.purgeCache(this.fConf);
cleanupStorage();
+ //mark as just started; this is used in heartbeats
+ this.justStarted = true;
+ int connectTimeout = fConf
+ .getInt("mapred.task.tracker.connect.timeout", 60000);
this.jobClient = (InterTrackerProtocol)
RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
- jobTrackAddr, this.fConf);
+ jobTrackAddr, fConf, connectTimeout);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connected to JobTracker at " + jobTrackAddr);
+ }
this.justInited = true;
this.running = true;
// start the thread that will fetch map task completion events
@@ -611,7 +616,9 @@
* startup, to remove any leftovers from previous run.
*/
public void cleanupStorage() throws IOException {
- this.fConf.deleteLocalFiles();
+ if (fConf != null) {
+ fConf.deleteLocalFiles();
+ }
}
// Object on wait which MapEventsFetcherThread is going to wait.
@@ -890,25 +897,117 @@
}
}
- public synchronized void shutdown() throws IOException {
- shuttingDown = true;
- close();
- if (this.server != null) {
- try {
- LOG.info("Shutting down StatusHttpServer");
- this.server.stop();
+ /////////////////////////////////////////////////////
+ // Service Lifecycle
+ /////////////////////////////////////////////////////
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws IOException for any problem.
+ */
+ @Override
+ protected synchronized void innerStart() throws IOException {
+ JobConf conf = fConf;
+ maxCurrentMapTasks = conf.getInt(
+ "mapred.tasktracker.map.tasks.maximum", 2);
+ maxCurrentReduceTasks = conf.getInt(
+ "mapred.tasktracker.reduce.tasks.maximum", 2);
+ this.jobTrackAddr = JobTracker.getAddress(conf);
+ String infoAddr =
+ NetUtils.getServerAddress(conf,
+ "tasktracker.http.bindAddress",
+ "tasktracker.http.port",
+ "mapred.task.tracker.http.address");
+ InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
+ String httpBindAddress = infoSocAddr.getHostName();
+ int port = infoSocAddr.getPort();
+ this.server = new HttpServer("task", httpBindAddress, port,
+ port == 0, conf);
+ workerThreads = conf.getInt("tasktracker.http.threads", 40);
+ this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
+ server.setThreads(1, workerThreads);
+ // let the jsp pages get to the task tracker, config, and other relevant
+ // objects
+ FileSystem local = FileSystem.getLocal(conf);
+ this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+ server.setAttribute("task.tracker", this);
+ server.setAttribute("local.file.system", local);
+ server.setAttribute("conf", conf);
+ server.setAttribute("log", LOG);
+ server.setAttribute("localDirAllocator", localDirAllocator);
+ server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+ server.addInternalServlet("mapOutput", "/mapOutput",
+ MapOutputServlet.class);
+ server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
+ server.start();
+ this.httpPort = server.getPort();
+ initialize();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param status a status that can be updated with problems
+ * @throws IOException for any problem
+ */
+ @Override
+ public void innerPing(ServiceStatus status) throws IOException {
+ if (server == null || !server.isAlive()) {
+ status.addThrowable(
+ new IOException("TaskTracker HttpServer is not running on port "
+ + httpPort));
+ }
+ if (taskReportServer == null) {
+ status.addThrowable(
+ new IOException("TaskTracker Report Server is not running on "
+ + taskReportAddress));
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws IOException exceptions which will be logged
+ */
+ @Override
+ protected void innerClose() throws IOException {
+ synchronized (this) {
+ shuttingDown = true;
+ closeTaskTracker();
+ if (this.server != null) {
+ try {
+ LOG.info("Shutting down StatusHttpServer");
+ this.server.stop();
} catch (Exception e) {
LOG.warn("Exception shutting down TaskTracker", e);
+ }
}
+ stopCleanupThreads();
}
}
+
+ /**
+ * A shutdown request triggers termination
+ * @throws IOException when errors happen during termination
+ */
+ public synchronized void shutdown() throws IOException {
+ close();
+ }
+
/**
* Close down the TaskTracker and all its components. We must also shutdown
* any running tasks or threads, and cleanup disk space. A new TaskTracker
* within the same process space might be restarted, so everything must be
* clean.
+ * @throws IOException when errors happen during shutdown
*/
- public synchronized void close() throws IOException {
+ public synchronized void closeTaskTracker() throws IOException {
+ if (!running) {
+ //this operation is a no-op when not already running
+ return;
+ }
+ running = false;
//
// Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
// because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -920,27 +1019,37 @@
tip.jobHasFinished(false);
}
- this.running = false;
-
// Clear local storage
cleanupStorage();
// Shutdown the fetcher thread
- this.mapEventsFetcher.interrupt();
+ if (mapEventsFetcher != null) {
+ mapEventsFetcher.interrupt();
+ }
//stop the launchers
- this.mapLauncher.interrupt();
- this.reduceLauncher.interrupt();
-
- jvmManager.stop();
+ if (mapLauncher != null) {
+ mapLauncher.cleanTaskQueue();
+ mapLauncher.interrupt();
+ }
+ if (reduceLauncher != null) {
+ reduceLauncher.cleanTaskQueue();
+ reduceLauncher.interrupt();
+ }
+ if (jvmManager != null) {
+ jvmManager.stop();
+ }
+
// shutdown RPC connections
RPC.stopProxy(jobClient);
// wait for the fetcher thread to exit
for (boolean done = false; !done; ) {
try {
- this.mapEventsFetcher.join();
+ if(mapEventsFetcher != null) {
+ mapEventsFetcher.join();
+ }
done = true;
} catch (InterruptedException e) {
}
@@ -953,52 +1062,54 @@
}
/**
- * Start with the local machine name, and the default JobTracker
+ * Create and start a task tracker.
+ * Subclasses must not subclass this constructor, as it may
+ * call their initialisation/startup methods before the constructor
+ * is complete.
+ * It is here for backwards compatibility.
+ * @param conf configuration
+ * @throws IOException for problems on startup
*/
public TaskTracker(JobConf conf) throws IOException {
+ this(conf, true);
+ }
+
+ /**
+ * Subclasses should extend this constructor and pass start=false to the
+ * superclass to avoid race conditions in constructors and threads.
+ * @param conf configuration
+ * @param start flag to set to true to start the daemon
+ * @throws IOException for problems on startup
+ */
+ protected TaskTracker(JobConf conf, boolean start) throws IOException {
+ super(conf);
fConf = conf;
- maxCurrentMapTasks = conf.getInt(
- "mapred.tasktracker.map.tasks.maximum", 2);
- maxCurrentReduceTasks = conf.getInt(
- "mapred.tasktracker.reduce.tasks.maximum", 2);
- this.jobTrackAddr = JobTracker.getAddress(conf);
- String infoAddr =
- NetUtils.getServerAddress(conf,
- "tasktracker.http.bindAddress",
- "tasktracker.http.port",
- "mapred.task.tracker.http.address");
- InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
- String httpBindAddress = infoSocAddr.getHostName();
- int httpPort = infoSocAddr.getPort();
- this.server = new HttpServer("task", httpBindAddress, httpPort,
- httpPort == 0, conf);
- workerThreads = conf.getInt("tasktracker.http.threads", 40);
- this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
- server.setThreads(1, workerThreads);
- // let the jsp pages get to the task tracker, config, and other relevant
- // objects
- FileSystem local = FileSystem.getLocal(conf);
- this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
- server.setAttribute("task.tracker", this);
- server.setAttribute("local.file.system", local);
- server.setAttribute("conf", conf);
- server.setAttribute("log", LOG);
- server.setAttribute("localDirAllocator", localDirAllocator);
- server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
- server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
- server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
- server.start();
- this.httpPort = server.getPort();
- initialize();
+ //for backwards compatibility, the task tracker starts up unless told not
+ //to. Subclasses should be very cautious about having their superclass
+ //do that as subclassed methods can be invoked before the class is fully
+ //configured
+ if(start) {
+ deploy(this);
+ }
}
private void startCleanupThreads() throws IOException {
- taskCleanupThread.setDaemon(true);
+ taskCleanupThread = new TaskCleanupThread();
taskCleanupThread.start();
directoryCleanupThread = new CleanupQueue();
}
/**
+ * Tell the cleanup threads that they should end themselves
+ */
+ private void stopCleanupThreads() {
+ if (taskCleanupThread != null) {
+ taskCleanupThread.terminate();
+ taskCleanupThread = null;
+ }
+ }
+
+ /**
* The connection to the JobTracker, used by the TaskRunner
* for locating remote files.
*/
@@ -1045,6 +1156,7 @@
*/
State offerService() throws Exception {
long lastHeartbeat = 0;
+ boolean restartingService = true;
while (running && !shuttingDown) {
try {
@@ -1060,6 +1172,7 @@
// 1. Verify the buildVersion
// 2. Get the system directory & filesystem
if(justInited) {
+ LOG.debug("Checking build version with JobTracker");
String jobTrackerBV = jobClient.getBuildVersion();
if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
String msg = "Shutting down. Incompatible buildVersion." +
@@ -1069,7 +1182,7 @@
try {
jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
} catch(Exception e ) {
- LOG.info("Problem reporting to jobtracker: " + e);
+ LOG.info("Problem reporting to jobtracker: " + e, e);
}
return State.DENIED;
}
@@ -1080,6 +1193,9 @@
}
systemDirectory = new Path(dir);
systemFS = systemDirectory.getFileSystem(fConf);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("System directory is " + systemDirectory);
+ }
}
// Send the heartbeat and process the jobtracker's directives
@@ -1132,6 +1248,15 @@
return State.STALE;
}
+ //At this point the job tracker is present and compatible,
+ //so the service is coming up.
+ //It is time to declare it as such
+ if (restartingService) {
+ //declare the service as live.
+ enterLiveState();
+ restartingService = false;
+ }
+
// resetting heartbeat interval from the response.
heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
justStarted = false;
@@ -1157,15 +1282,16 @@
//we've cleaned up, resume normal operation
if (!acceptNewTasks && isIdle()) {
+ LOG.info("ready to accept new tasks again");
acceptNewTasks=true;
}
} catch (InterruptedException ie) {
LOG.info("Interrupted. Closing down.");
return State.INTERRUPTED;
} catch (DiskErrorException de) {
- String msg = "Exiting task tracker for disk error:\n" +
+ String msg = "Exiting task tracker for disk error:\n" +
StringUtils.stringifyException(de);
- LOG.error(msg);
+ LOG.error(msg, de);
synchronized (this) {
jobClient.reportTaskTrackerError(taskTrackerName,
"DiskErrorException", msg);
@@ -1177,10 +1303,9 @@
LOG.info("Tasktracker disallowed by JobTracker.");
return State.DENIED;
}
- } catch (Exception except) {
- String msg = "Caught exception: " +
- StringUtils.stringifyException(except);
- LOG.error(msg);
+ } catch (IOException except) {
+ String msg = "Caught exception: " + except;
+ LOG.error(msg, except);
}
}
@@ -1491,6 +1616,7 @@
localMinSpaceKill = minSpaceKill;
}
if (!enoughFreeSpace(localMinSpaceKill)) {
+ LOG.info("Tasktracker running out of space -not accepting new tasks");
acceptNewTasks=false;
//we give up! do not accept new tasks until
//all the ones running have finished and they're all cleared up
@@ -1752,7 +1878,7 @@
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
- LOG.warn(msg);
+ LOG.warn(msg, e);
tip.reportDiagnosticInfo(msg);
try {
tip.kill(true);
@@ -1812,17 +1938,22 @@
if (!shuttingDown) {
LOG.info("Lost connection to JobTracker [" +
jobTrackAddr + "]. Retrying...", ex);
+ //enter the started state; we are no longer live
+ enterState(ServiceState.UNDEFINED, ServiceState.STARTED);
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
+ LOG.info("Interrupted while waiting for the job tracker", ie);
}
}
}
}
} finally {
- close();
+ closeTaskTracker();
+ }
+ if (shuttingDown) {
+ return;
}
- if (shuttingDown) { return; }
LOG.warn("Reinitializing local state");
initialize();
}
@@ -1830,8 +1961,7 @@
shutdown();
}
} catch (IOException iex) {
- LOG.error("Got fatal exception while reinitializing TaskTracker: " +
- StringUtils.stringifyException(iex));
+ LOG.error("Got fatal exception while reinitializing TaskTracker: " + iex, iex);
return;
}
}
@@ -2502,6 +2632,20 @@
}
String taskDir = getLocalTaskDir(task.getJobID().toString(),
taskId.toString(), task.isTaskCleanupTask());
+ CleanupQueue cleaner = directoryCleanupThread;
+ boolean cleanupThread = cleaner == null;
+ if (!cleanupThread) {
+ LOG.info("Cannot clean up: no directory cleanup thread");
+ }
+ if (taskDir == null) {
+ throw new IOException("taskDir==null");
+ }
+ if(localJobConf==null) {
+ throw new IOException("localJobConf==null");
+ }
+ if (defaultJobConf == null) {
+ throw new IOException("defaultJobConf==null");
+ }
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
@@ -2785,7 +2929,17 @@
String getName() {
return taskTrackerName;
}
-
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the name of this service
+ */
+ @Override
+ public String getServiceName() {
+ return taskTrackerName != null ? taskTrackerName : "Task Tracker";
+ }
+
private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
boolean sendCounters) {
List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
@@ -2902,7 +3056,9 @@
// enable the server to track time spent waiting on locks
ReflectionUtils.setContentionTracing
(conf.getBoolean("tasktracker.contention.tracking", false));
- new TaskTracker(conf).run();
+ TaskTracker tracker = new TaskTracker(conf, false);
+ Service.deploy(tracker);
+ tracker.run();
} catch (Throwable e) {
LOG.error("Can not start task tracker because "+
StringUtils.stringifyException(e));
@@ -3220,8 +3376,70 @@
try {
purgeTask(tip, wasFailure); // Marking it as failed/killed.
} catch (IOException ioe) {
- LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
+ LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe, ioe);
}
}
}
+
+ /**
+ * Cleanup queue that can process actions to kill a job or task
+ */
+ private class TaskCleanupThread extends Daemon {
+
+ /**
+ * flag to halt work
+ */
+ private volatile boolean live = true;
+
+
+ /**
+ * Construct a daemon thread.
+ */
+ private TaskCleanupThread() {
+ setName("Task Tracker Task Cleanup Thread");
+ }
+
+ /**
+ * End the daemon. This is done by setting the live flag to false and
+ * interrupting ourselves.
+ */
+ public void terminate() {
+ live = false;
+ interrupt();
+ }
+
+ /**
+ * process task kill actions until told to stop being live.
+ */
+ public void run() {
+ LOG.debug("Task cleanup thread started");
+ while (live) {
+ try {
+ TaskTrackerAction action = tasksToCleanup.take();
+ if (action instanceof KillJobAction) {
+ purgeJob((KillJobAction) action);
+ } else if (action instanceof KillTaskAction) {
+ TaskInProgress tip;
+ KillTaskAction killAction = (KillTaskAction) action;
+ synchronized (TaskTracker.this) {
+ tip = tasks.get(killAction.getTaskID());
+ }
+ LOG.info("Received KillTaskAction for task: " +
+ killAction.getTaskID());
+ purgeTask(tip, false);
+ } else {
+ LOG.error("Non-delete action given to cleanup thread: "
+ + action);
+ }
+ } catch (InterruptedException except) {
+ //interrupted. this may have reset the live flag
+ } catch (Throwable except) {
+ LOG.warn("Exception in Cleanup thread: " + except,
+ except);
+ }
+ }
+ LOG.debug("Task cleanup thread ending");
+ }
+ }
+
}
Modified: hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java Thu Mar 19 12:15:51 2009
@@ -107,6 +107,15 @@
return actionType;
}
+ /**
+ * {@inheritDoc}
+ * @return the action type.
+ */
+ @Override
+ public String toString() {
+ return "TaskTrackerAction: " + actionType;
+ }
+
public void write(DataOutput out) throws IOException {
WritableUtils.writeEnum(out, actionType);
}
Modified: hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Mar 19 12:15:51 2009
@@ -301,6 +301,18 @@
}
/**
+ * String value prints the basic status of the task tracker
+ * @return a string value for diagnostics
+ */
+ @Override
+ public String toString() {
+ return trackerName
+ + " at http://" + host + ":" + httpPort + "/"
+ + " current task count: " + taskReports.size()
+ + " failed task count: " + failures;
+ }
+
+ /**
* Return the {@link ResourceStatus} object configured with this
* status.
*
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Thu Mar 19 12:15:51 2009
@@ -79,7 +79,7 @@
FileOutputFormat.setOutputPath(conf, outDir);
- JobClient.runJob(conf);
+ runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
@@ -100,4 +100,14 @@
}
+ /**
+ * Run a job, getting the timeout from a system property
+ * @param conf job configuration to use
+ * @throws IOException for any problem, including job failure
+ */
+ private void runJob(JobConf conf) throws IOException {
+ long timeout = Long.getLong("test.jobclient.timeout", 0);
+ JobClient.runJob(conf, timeout);
+ }
+
}
\ No newline at end of file
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestCopyFiles.java Thu Mar 19 12:15:51 2009
@@ -64,6 +64,15 @@
private static String TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"))
.toString().replace(' ', '+');
+ private MiniDFSCluster cluster;
+
+ /**
+ * terminate any non-null cluster
+ */
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ MiniDFSCluster.close(cluster);
+ }
/** class MyFile contains enough information to recreate the contents of
* a single file.
@@ -269,8 +278,6 @@
/** copy files from dfs file system to dfs file system */
public void testCopyFromDfsToDfs() throws Exception {
String namenode = null;
- MiniDFSCluster cluster = null;
- try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster(conf, 2, true, null);
final FileSystem hdfs = cluster.getFileSystem();
@@ -291,15 +298,10 @@
deldir(hdfs, "/srcdat");
deldir(hdfs, "/logs");
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
- }
}
/** copy files from local file system to dfs file system */
public void testCopyFromLocalToDfs() throws Exception {
- MiniDFSCluster cluster = null;
- try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster(conf, 1, true, null);
final FileSystem hdfs = cluster.getFileSystem();
@@ -319,15 +321,10 @@
deldir(hdfs, "/logs");
deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat");
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
- }
}
/** copy files from dfs file system to local file system */
public void testCopyFromDfsToLocal() throws Exception {
- MiniDFSCluster cluster = null;
- try {
Configuration conf = new Configuration();
final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
cluster = new MiniDFSCluster(conf, 1, true, null);
@@ -348,16 +345,11 @@
deldir(hdfs, "/logs");
deldir(hdfs, "/srcdat");
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
- }
}
public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(conf, 2, true, null);
+ Configuration conf = new Configuration();
+ cluster = new MiniDFSCluster(conf, 2, true, null);
final FileSystem hdfs = cluster.getFileSystem();
final String namenode = hdfs.getUri().toString();
if (namenode.startsWith("hdfs://")) {
@@ -408,9 +400,7 @@
deldir(hdfs, "/srcdat");
deldir(hdfs, "/logs");
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
- }
+
}
public void testCopyDuplication() throws Exception {
@@ -486,11 +476,9 @@
public void testPreserveOption() throws Exception {
Configuration conf = new Configuration();
- MiniDFSCluster cluster = null;
- try {
- cluster = new MiniDFSCluster(conf, 2, true, null);
- String nnUri = FileSystem.getDefaultUri(conf).toString();
- FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+ cluster = new MiniDFSCluster(conf, 2, true, null);
+ String nnUri = FileSystem.getDefaultUri(conf).toString();
+ FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
{//test preserving user
MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
@@ -551,19 +539,15 @@
deldir(fs, "/destdat");
deldir(fs, "/srcdat");
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
}
- }
public void testMapCount() throws Exception {
String namenode = null;
- MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
try {
Configuration conf = new Configuration();
- dfs = new MiniDFSCluster(conf, 3, true, null);
- FileSystem fs = dfs.getFileSystem();
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ FileSystem fs = cluster.getFileSystem();
final FsShell shell = new FsShell(conf);
namenode = fs.getUri().toString();
mr = new MiniMRCluster(3, namenode, 1);
@@ -604,8 +588,7 @@
assertTrue("Unexpected map count, logs.length=" + logs.length,
logs.length == 2);
} finally {
- if (dfs != null) { dfs.shutdown(); }
- if (mr != null) { mr.shutdown(); }
+ MiniMRCluster.close(mr);
}
}
@@ -714,9 +697,7 @@
}
public void testHftpAccessControl() throws Exception {
- MiniDFSCluster cluster = null;
- try {
- final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true);
+ final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true);
final UnixUserGroupInformation USER_UGI = createUGI("user", false);
//start cluster by DFS_UGI
@@ -750,9 +731,6 @@
fs.setPermission(srcrootpath, new FsPermission((short)0));
assertEquals(-3, ToolRunner.run(distcp, args));
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
- }
}
/** test -delete */
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Mar 19 12:15:51 2009
@@ -65,25 +65,25 @@
private static final long MEGA = 1024 * 1024;
private static final int SEEKS_PER_FILE = 4;
- private static String ROOT = System.getProperty("test.build.data","fs_test");
- private static Path CONTROL_DIR = new Path(ROOT, "fs_control");
- private static Path WRITE_DIR = new Path(ROOT, "fs_write");
- private static Path READ_DIR = new Path(ROOT, "fs_read");
- private static Path DATA_DIR = new Path(ROOT, "fs_data");
+ private static final String ROOT = System.getProperty("test.build.data","fs_test");
+ private static final Path CONTROL_DIR = new Path(ROOT, "fs_control");
+ private static final Path WRITE_DIR = new Path(ROOT, "fs_write");
+ private static final Path READ_DIR = new Path(ROOT, "fs_read");
+ private static final Path DATA_DIR = new Path(ROOT, "fs_data");
public void testFs() throws Exception {
- testFs(10 * MEGA, 100, 0);
+ createTestFs(10 * MEGA, 100, 0);
}
- public static void testFs(long megaBytes, int numFiles, long seed)
+ private static void createTestFs(long megaBytes, int numFiles, long seed)
throws Exception {
FileSystem fs = FileSystem.get(conf);
- if (seed == 0)
+ if (seed == 0) {
seed = new Random().nextLong();
-
- LOG.info("seed = "+seed);
+ LOG.info("seed = " + seed);
+ }
createControlFile(fs, megaBytes, numFiles, seed);
writeTest(fs, false);
@@ -553,7 +553,7 @@
}
}
} finally {
- if (cluster != null) cluster.shutdown();
+ MiniDFSCluster.close(cluster);
}
}
@@ -563,30 +563,44 @@
fileSys.checkPath(new Path("hdfs://" + add.getHostName().toUpperCase() + ":" + add.getPort()));
}
- public void testFsClose() throws Exception {
- {
- Configuration conf = new Configuration();
- new Path("file:///").getFileSystem(conf);
- UnixUserGroupInformation.login(conf, true);
- FileSystem.closeAll();
- }
+ public void testCloseFileFS() throws Exception {
+ Configuration conf = new Configuration();
+ new Path("file:///").getFileSystem(conf);
+ UnixUserGroupInformation.login(conf, true);
+ FileSystem.closeAll();
+ }
- {
+ public void testCloseHftpFS() throws Exception {
Configuration conf = new Configuration();
new Path("hftp://localhost:12345/").getFileSystem(conf);
UnixUserGroupInformation.login(conf, true);
FileSystem.closeAll();
- }
+ }
- {
+ public void testCloseHftpFSAltLogin() throws Exception {
+ Configuration conf = new Configuration();
+ FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
+ UnixUserGroupInformation.login(fs.getConf(), true);
+ FileSystem.closeAll();
+ }
+
+
+ public void testCloseHDFS() throws Exception {
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster(new Configuration(), 2, true, null);
+ URI uri = cluster.getFileSystem().getUri();
+ FileSystem fs = FileSystem.get(uri, new Configuration());
+ checkPath(cluster, fs);
Configuration conf = new Configuration();
- FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
- UnixUserGroupInformation.login(fs.getConf(), true);
+ new Path(uri.toString()).getFileSystem(conf);
+ UnixUserGroupInformation.login(conf, true);
FileSystem.closeAll();
+ } finally {
+ MiniDFSCluster.close(cluster);
}
}
-
public void testCacheKeysAreCaseInsensitive()
throws Exception
{
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Thu Mar 19 12:15:51 2009
@@ -25,12 +25,14 @@
import java.nio.channels.FileChannel;
import java.util.Random;
import java.io.RandomAccessFile;
+import java.io.Closeable;
import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.*;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -43,13 +45,18 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.security.*;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* This class creates a single-process DFS cluster for junit testing.
* The data directories for non-simulated DFS are under the testing directory.
* For simulated data nodes, no underlying fs storage is used.
*/
-public class MiniDFSCluster {
+public class MiniDFSCluster implements Closeable {
+ private static final int WAIT_SLEEP_TIME_MILLIS = 500;
+ private static final int STARTUP_TIMEOUT_MILLIS = 30000;
public class DataNodeProperties {
DataNode datanode;
@@ -63,6 +70,7 @@
}
}
+ private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class);
private Configuration conf;
private NameNode nameNode;
private int numDataNodes;
@@ -281,17 +289,18 @@
}
/**
- * wait for the cluster to get out of
+ * wait for the cluster to get out of
* safemode.
*/
public void waitClusterUp() {
if (numDataNodes > 0) {
- while (!isClusterUp()) {
- try {
- System.err.println("Waiting for the Mini HDFS Cluster to start...");
- Thread.sleep(1000);
- } catch (InterruptedException e) {
+ try {
+ while (!isClusterUp()) {
+ LOG.warn("Waiting for the Mini HDFS Cluster to start...");
+ Thread.sleep(WAIT_SLEEP_TIME_MILLIS);
}
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted during startup", e);
}
}
}
@@ -323,7 +332,6 @@
boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts,
long[] simulatedCapacities) throws IOException {
-
int curDatanodesNum = dataNodes.size();
// for mincluster's the default initialDelay for BRs is 0
if (conf.get("dfs.blockreport.initialDelay") == null) {
@@ -350,7 +358,7 @@
}
//Generate some hostnames if required
if (racks != null && hosts == null) {
- System.out.println("Generating host names for datanodes");
+ LOG.info("Generating host names for datanodes");
hosts = new String[numDataNodes];
for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
@@ -393,16 +401,16 @@
dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
simulatedCapacities[i-curDatanodesNum]);
}
- System.out.println("Starting DataNode " + i + " with dfs.data.dir: "
+ LOG.info("Starting DataNode " + i + " with dfs.data.dir: "
+ dnConf.get("dfs.data.dir"));
if (hosts != null) {
dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
- System.out.println("Starting DataNode " + i + " with hostname set to: "
+ LOG.info("Starting DataNode " + i + " with hostname set to: "
+ dnConf.get("slave.host.name"));
}
if (racks != null) {
String name = hosts[i - curDatanodesNum];
- System.out.println("Adding node with hostname : " + name + " to rack "+
+ LOG.info("Adding node with hostname : " + name + " to rack "+
racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(name,
racks[i-curDatanodesNum]);
@@ -417,7 +425,7 @@
String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
if (racks != null) {
int port = dn.getSelfAddr().getPort();
- System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+
+ LOG.info("Adding node with IP:port : " + ipAddr + ":" + port+
" to rack " + racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(ipAddr + ":" + port,
racks[i-curDatanodesNum]);
@@ -548,8 +556,8 @@
/**
* Shut down the servers that are up.
*/
- public void shutdown() {
- System.out.println("Shutting down the Mini HDFS Cluster");
+ public synchronized void shutdown() {
+ LOG.info("Shutting down the Mini HDFS Cluster");
shutdownDataNodes();
if (nameNode != null) {
nameNode.stop();
@@ -557,20 +565,57 @@
nameNode = null;
}
}
-
+
+ /**
+ * Shuts down the cluster.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ public void close() throws IOException {
+ shutdown();
+ }
+
+ /**
+ * Static operation to shut down a cluster;
+ * harmless if the cluster argument is null
+ *
+ * @param cluster cluster to shut down, or null for no cluster
+ */
+ public static void close(Closeable cluster) {
+ Service.close(cluster);
+ }
+
/**
* Shutdown all DataNodes started by this class. The NameNode
* is left running so that new DataNodes may be started.
*/
public void shutdownDataNodes() {
for (int i = dataNodes.size()-1; i >= 0; i--) {
- System.out.println("Shutting down DataNode " + i);
+ LOG.info("Shutting down DataNode " + i);
DataNode dn = dataNodes.remove(i).datanode;
dn.shutdown();
numDataNodes--;
}
}
+ /**
+ * Returns a string representation of the cluster.
+ *
+ * @return a string representation of the cluster
+ */
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Cluster up:").append(isClusterUp());
+ builder.append("\nName Node:").append(getNameNode());
+ builder.append("\nData node count:").append(dataNodes.size());
+ for (DataNodeProperties dnp : dataNodes) {
+ builder.append("\n Datanode: ").append(dnp.datanode);
+ builder.append("\n state: ").append(dnp.datanode.getServiceState());
+ }
+ return builder.toString();
+ }
+
/*
* Corrupt a block on all datanode
*/
@@ -595,12 +640,15 @@
if (blockFile.exists()) {
// Corrupt replica by writing random bytes into replica
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
- FileChannel channel = raFile.getChannel();
- String badString = "BADBAD";
- int rand = random.nextInt((int)channel.size()/2);
- raFile.seek(rand);
- raFile.write(badString.getBytes());
- raFile.close();
+ try {
+ FileChannel channel = raFile.getChannel();
+ String badString = "BADBAD";
+ int rand = random.nextInt((int) channel.size() / 2);
+ raFile.seek(rand);
+ raFile.write(badString.getBytes());
+ } finally {
+ raFile.close();
+ }
}
corrupted = true;
}
@@ -616,7 +664,7 @@
}
DataNodeProperties dnprop = dataNodes.remove(i);
DataNode dn = dnprop.datanode;
- System.out.println("MiniDFSCluster Stopping DataNode " +
+ LOG.info("MiniDFSCluster Stopping DataNode " +
dn.dnRegistration.getName() +
" from a total of " + (dataNodes.size() + 1) +
" datanodes.");
@@ -655,8 +703,10 @@
}
}
- /*
+ /**
* Shutdown a datanode by name.
+ * @param name datanode name
+ * @return true if a node was shut down
*/
public synchronized DataNodeProperties stopDataNode(String name) {
int i;
@@ -670,7 +720,7 @@
}
/**
- * Returns true if the NameNode is running and is out of Safe Mode.
+ * @return true if the NameNode is running and is out of Safe Mode.
*/
public boolean isClusterUp() {
if (nameNode == null) {
@@ -685,7 +735,7 @@
}
/**
- * Returns true if there is at least one DataNode running.
+ * @return true if there is at least one DataNode running.
*/
public boolean isDataNodeUp() {
if (dataNodes == null || dataNodes.size() == 0) {
@@ -696,13 +746,15 @@
/**
* Get a client handle to the DFS cluster.
+ * @return a new filesystem, which must be closed when no longer needed.
+ * @throws IOException if the filesystem cannot be created
*/
public FileSystem getFileSystem() throws IOException {
return FileSystem.get(conf);
}
/**
- * Get the directories where the namenode stores its image.
+ * @return the directories where the namenode stores its state.
*/
public Collection<File> getNameDirs() {
return FSNamesystem.getNamespaceDirs(conf);
@@ -725,17 +777,27 @@
InetSocketAddress addr = new InetSocketAddress("localhost",
getNameNodePort());
DFSClient client = new DFSClient(addr, conf);
+ try {
+ DatanodeInfo[] dnInfos;
- // make sure all datanodes are alive
- while(client.datanodeReport(DatanodeReportType.LIVE).length
- != numDataNodes) {
- try {
- Thread.sleep(500);
- } catch (Exception e) {
+ // make sure all datanodes are alive
+ long timeout=System.currentTimeMillis() + STARTUP_TIMEOUT_MILLIS;
+ while((dnInfos = client.datanodeReport(DatanodeReportType.LIVE)).length
+ != numDataNodes) {
+ try {
+ Thread.sleep(WAIT_SLEEP_TIME_MILLIS);
+ if(System.currentTimeMillis() > timeout) {
+ throw new IOException("Timeout waiting for the datanodes. "+
+ "Expected " + numDataNodes + "but got " + dnInfos.length);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while waiting for the datanodes",e);
+ }
}
+ } finally {
+ client.close();
}
- client.close();
}
public void formatDataNodeDirs() throws IOException {
@@ -824,7 +886,7 @@
}
/**
- * Returns the current set of datanodes
+ * @return the current set of datanodes
*/
DataNode[] listDataNodes() {
DataNode[] list = new DataNode[dataNodes.size()];
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java Thu Mar 19 12:15:51 2009
@@ -58,7 +58,7 @@
config = new Configuration();
config.set("dfs.name.dir", new File(hdfsDir, "name1").getPath());
FileSystem.setDefaultUri(config, "hdfs://"+NAME_NODE_HOST + "0");
- config.set("dfs.http.address", NAME_NODE_HTTP_HOST + "0");
+ config.set("dfs.http.address", "0.0.0.0:0");
NameNode.format(config);
String[] args = new String[] {};
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestReplication.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestReplication.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestReplication.java Thu Mar 19 12:15:51 2009
@@ -19,7 +19,6 @@
import junit.framework.TestCase;
import java.io.*;
-import java.util.Iterator;
import java.util.Random;
import java.net.*;
@@ -71,7 +70,7 @@
Configuration conf = fileSys.getConf();
ClientProtocol namenode = DFSClient.createNamenode(conf);
- waitForBlockReplication(name.toString(), namenode,
+ waitForBlockReplication(name.toString(), namenode,
Math.min(numDatanodes, repl), -1);
LocatedBlocks locations = namenode.getBlockLocations(name.toString(),0,
@@ -169,9 +168,9 @@
// Now get block details and check if the block is corrupt
blocks = dfsClient.namenode.
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+ LOG.info("Waiting until block is marked as corrupt...");
while (blocks.get(0).isCorrupt() != true) {
try {
- LOG.info("Waiting until block is marked as corrupt...");
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
@@ -249,16 +248,14 @@
boolean replOk = true;
LocatedBlocks blocks = namenode.getBlockLocations(filename, 0,
Long.MAX_VALUE);
-
- for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
- iter.hasNext();) {
- LocatedBlock block = iter.next();
+
+ for (LocatedBlock block : blocks.getLocatedBlocks()) {
int actual = block.getLocations().length;
- if ( actual < expected ) {
+ if (actual < expected) {
if (true || iters > 0) {
LOG.info("Not enough replicas for " + block.getBlock() +
- " yet. Expecting " + expected + ", got " +
- actual + ".");
+ " yet. Expecting " + expected + ", got " +
+ actual + ".");
}
replOk = false;
break;
@@ -385,10 +382,8 @@
waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
} finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
+ MiniDFSCluster.close(cluster);
+ }
}
/**
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Thu Mar 19 12:15:51 2009
@@ -137,6 +137,11 @@
editLog.close();
+ //check that the namesystem is still healthy
+ assertNotNull("FSNamesystem.getFSNamesystem() is null",
+ FSNamesystem.getFSNamesystem());
+ assertNotNull("FSNamesystem.getFSNamesystem().dir is null",
+ FSNamesystem.getFSNamesystem().dir);
// Verify that we can read in all the transactions that we have written.
// If there were any corruptions, it is likely that the reading in
// of these transactions will throw an exception.
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Thu Mar 19 12:15:51 2009
@@ -93,7 +93,7 @@
config = props;
}
- public ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
+ private ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
int numDir) throws Exception {
super(numTaskTrackers, namenode, numDir);
}
@@ -121,14 +121,10 @@
* @throws Exception if the cluster could not be stopped
*/
protected void stopCluster() throws Exception {
- if (mrCluster != null) {
- mrCluster.shutdown();
- mrCluster = null;
- }
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
+ MiniMRCluster.close(mrCluster);
+ mrCluster = null;
+ MiniDFSCluster.close(dfsCluster);
+ dfsCluster = null;
}
/**
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/HadoopTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/HadoopTestCase.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/HadoopTestCase.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/HadoopTestCase.java Thu Mar 19 12:15:51 2009
@@ -163,22 +163,8 @@
* @throws Exception
*/
protected void tearDown() throws Exception {
- try {
- if (mrCluster != null) {
- mrCluster.shutdown();
- }
- }
- catch (Exception ex) {
- System.out.println(ex);
- }
- try {
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- }
- }
- catch (Exception ex) {
- System.out.println(ex);
- }
+ MiniMRCluster.close(mrCluster);
+ MiniDFSCluster.close(dfsCluster);
super.tearDown();
}
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu Mar 19 12:15:51 2009
@@ -19,6 +19,7 @@
import java.io.File;
import java.io.IOException;
+import java.io.Closeable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -31,12 +32,13 @@
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.Service;
/**
* This class creates a single-process Map-Reduce cluster for junit testing.
* One thread is created for each server.
*/
-public class MiniMRCluster {
+public class MiniMRCluster implements Closeable {
private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
private Thread jobTrackerThread;
@@ -52,9 +54,13 @@
private String namenode;
private UnixUserGroupInformation ugi = null;
+ private static final int TRACKER_STABILIZATION_TIMEOUT = 30000;
+
private JobConf conf;
private JobConf job;
+ /** time for a tracker to shut down : {@value} */
+ private static final long TRACKER_SHUTDOWN_TIMEOUT = 30000;
/**
* An inner class that runs a job tracker.
@@ -101,7 +107,7 @@
tracker = JobTracker.startTracker(jc);
tracker.offerService();
} catch (Throwable e) {
- LOG.error("Job tracker crashed", e);
+ LOG.error("Job tracker crashed: " + e, e);
isActive = false;
}
}
@@ -110,13 +116,12 @@
* Shutdown the job tracker and wait for it to finish.
*/
public void shutdown() {
- try {
- if (tracker != null) {
- tracker.stopTracker();
- }
- } catch (Throwable e) {
- LOG.error("Problem shutting down job tracker", e);
+ JobTracker jobTracker;
+ synchronized (this) {
+ jobTracker = tracker;
+ tracker = null;
}
+ Service.close(jobTracker);
isActive = false;
}
}
@@ -177,7 +182,7 @@
} catch (Throwable e) {
isDead = true;
tt = null;
- LOG.error("task tracker " + trackerId + " crashed", e);
+ LOG.error("task tracker " + trackerId + " crashed : "+e, e);
}
}
@@ -420,7 +425,7 @@
//Generate rack names if required
if (racks == null) {
- System.out.println("Generating rack names for tasktrackers");
+ LOG.info("Generating rack names for tasktrackers");
racks = new String[numTaskTrackers];
for (int i=0; i < racks.length; ++i) {
racks[i] = NetworkTopology.DEFAULT_RACK;
@@ -429,7 +434,7 @@
//Generate some hostnames if required
if (hosts == null) {
- System.out.println("Generating host names for tasktrackers");
+ LOG.info("Generating host names for tasktrackers");
hosts = new String[numTaskTrackers];
for (int i = 0; i < numTaskTrackers; i++) {
hosts[i] = "host" + i + ".foo.com";
@@ -470,6 +475,24 @@
}
this.job = createJobConf(conf);
+ // Wait till the MR cluster stabilizes
+ long timeout = System.currentTimeMillis() +
+ TRACKER_STABILIZATION_TIMEOUT;
+ while(jobTracker.tracker.getNumResolvedTaskTrackers() != numTaskTrackers) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted during startup");
+ }
+ if(System.currentTimeMillis() > timeout) {
+ String message = "Time out waiting for the task trackers to stabilize. "
+ + "Expected tracker count: " + numTaskTrackers
+ + " -actual count: "
+ + jobTracker.tracker.getNumResolvedTaskTrackers();
+ LOG.error(message);
+ throw new IOException(message);
+ }
+ }
waitUntilIdle();
}
@@ -583,12 +606,11 @@
* Kill the jobtracker.
*/
public void stopJobTracker() {
- //jobTracker.exit(-1);
jobTracker.shutdown();
jobTrackerThread.interrupt();
try {
- jobTrackerThread.join();
+ jobTrackerThread.join(TRACKER_SHUTDOWN_TIMEOUT);
} catch (InterruptedException ex) {
LOG.error("Problem waiting for job tracker to finish", ex);
}
@@ -649,6 +671,25 @@
}
}
+ /**
+ * Static operation to shut down a cluster; harmless if the cluster argument
+ * is null
+ *
+ * @param cluster cluster to shut down, or null for no cluster
+ */
+ public static void close(Closeable cluster) {
+ Service.close(cluster);
+ }
+
+ /**
+ * Shuts down the cluster.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ public void close() throws IOException {
+ shutdown();
+ }
+
public static void main(String[] args) throws IOException {
LOG.info("Bringing up Jobtracker and tasktrackers.");
MiniMRCluster mr = new MiniMRCluster(4, "file:///", 1);
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java Thu Mar 19 12:15:51 2009
@@ -35,6 +35,9 @@
*/
public class TestMRServerPorts extends TestCase {
TestHDFSServerPorts hdfs = new TestHDFSServerPorts();
+ private static final String STARTED_UNEXPECTEDLY
+ = "the Job tracker should not have started";
+ private static final String FAILED_TO_START = "The Job tracker did not start";
// Runs the JT in a separate thread
private static class JTRunner extends Thread {
@@ -85,7 +88,6 @@
return false;
throw e;
}
- jt.fs.close();
jt.stopTracker();
return true;
}
@@ -122,21 +124,21 @@
conf2.set("mapred.job.tracker.http.address",
TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
boolean started = canStartJobTracker(conf2);
- assertFalse(started); // should fail
+ assertFalse(STARTED_UNEXPECTEDLY, started); // should fail
// bind http server to the same port as name-node
conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
conf2.set("mapred.job.tracker.http.address",
hdfs.getConfig().get("dfs.http.address"));
started = canStartJobTracker(conf2);
- assertFalse(started); // should fail again
+ assertFalse(STARTED_UNEXPECTEDLY, started); // should fail again
// both ports are different from the name-node ones
conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
conf2.set("mapred.job.tracker.http.address",
TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
started = canStartJobTracker(conf2);
- assertTrue(started); // should start now
+ assertTrue(FAILED_TO_START, started); // should start now
} finally {
hdfs.stopNameNode(nn);
@@ -163,7 +165,7 @@
conf2.set("mapred.task.tracker.http.address",
TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
boolean started = canStartTaskTracker(conf2);
- assertFalse(started); // should fail
+ assertFalse(STARTED_UNEXPECTEDLY, started); // should fail
// bind http server to the same port as name-node
conf2.set("mapred.task.tracker.report.address",
@@ -171,7 +173,7 @@
conf2.set("mapred.task.tracker.http.address",
hdfs.getConfig().get("dfs.http.address"));
started = canStartTaskTracker(conf2);
- assertFalse(started); // should fail again
+ assertFalse(STARTED_UNEXPECTEDLY, started); // should fail again
// both ports are different from the name-node ones
conf2.set("mapred.task.tracker.report.address",
@@ -179,7 +181,7 @@
conf2.set("mapred.task.tracker.http.address",
TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
started = canStartTaskTracker(conf2);
- assertTrue(started); // should start now
+ assertTrue(FAILED_TO_START, started); // should start now
} finally {
if (jt != null) {
jt.fs.close();
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java Thu Mar 19 12:15:51 2009
@@ -46,7 +46,12 @@
private MiniMRCluster mr;
private MiniDFSCluster dfs;
private FileSystem fileSys;
-
+ private static final String BAILING_OUT = "Bailing out";
+ private static final String TEST_SCRIPT_BAILING_OUT
+ = "Test Script\n"+ BAILING_OUT;
+ private static final int SCRIPT_SLEEP_TIMEOUT = 60000;
+ private static final int SCRIPT_SLEEP_INTERVAL = 1000;
+
/**
* Fail map class
*/
@@ -55,7 +60,7 @@
public void map (LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
- System.err.println("Bailing out");
+ System.err.println(BAILING_OUT);
throw new IOException();
}
}
@@ -165,7 +170,17 @@
// construct the task id of first map task of failmap
TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId,true, 0), 0);
// wait for the job to finish.
- while (!job.isComplete()) ;
+ long timeout = System.currentTimeMillis() + SCRIPT_SLEEP_TIMEOUT;
+ while (!job.isComplete()) {
+ try {
+ Thread.sleep(SCRIPT_SLEEP_INTERVAL);
+ } catch (InterruptedException e) {
+ fail("Interrupted");
+ }
+ if(System.currentTimeMillis() > timeout) {
+ fail("Timeout waiting for the job to complete ");
+ }
+ }
// return the output of debugout log.
return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId);
@@ -204,7 +219,9 @@
outDir,debugDir, debugScript, input);
// Assert the output of debug script.
- assertEquals("Test Script\nBailing out", result);
+ if(!result.contains(TEST_SCRIPT_BAILING_OUT)) {
+ fail("Did not find " + TEST_SCRIPT_BAILING_OUT + "in \n" + result);
+ }
} finally {
// close file system and shut down dfs and mapred cluster
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java Thu Mar 19 12:15:51 2009
@@ -112,13 +112,11 @@
*/
TestRackAwareTaskPlacement.launchJobAndTestCounters(
testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0);
- mr.shutdown();
} finally {
fileSys.delete(inDir, true);
fileSys.delete(outputPath, true);
- if (dfs != null) {
- dfs.shutdown();
- }
+ MiniMRCluster.close(mr);
+ MiniDFSCluster.close(dfs);
}
}
}
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Thu Mar 19 12:15:51 2009
@@ -151,14 +151,11 @@
launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
0, 3);
mr.shutdown();
+ mr=null;
} finally {
- if (dfs != null) {
- dfs.shutdown();
- }
- if (mr != null) {
- mr.shutdown();
- }
+ MiniDFSCluster.close(dfs);
+ MiniMRCluster.close(mr);
}
}
static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Thu Mar 19 12:15:51 2009
@@ -247,8 +247,13 @@
// they were running on a lost tracker
testSetupAndCleanupKill(mr, dfs, false);
} finally {
- if (dfs != null) { dfs.shutdown(); }
- if (mr != null) { mr.shutdown();
+ try {
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown();
+ }
+ } catch (OutOfMemoryError e) {
+ //ignore this as it means something went very wrong in the test logging
+ //any attempt to log it may make things worse
}
}
}
Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Thu Mar 19 12:15:51 2009
@@ -81,8 +81,8 @@
runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
mr.waitUntilIdle();
} finally {
- mr.shutdown();
- dfs.shutdown();
+ MiniMRCluster.close(mr);
+ MiniDFSCluster.close(dfs);
}
}
|