Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Thu
Feb 5 17:39:27 2009
@@ -41,7 +41,7 @@
// what state is the task in?
public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
- COMMIT_PENDING}
+ COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
private final TaskAttemptID taskid;
private float progress;
@@ -204,6 +204,12 @@
}
this.phase = phase;
}
+
+ boolean inTaskCleanupPhase() {
+ return (this.phase == TaskStatus.Phase.CLEANUP &&
+ (this.runState == TaskStatus.State.FAILED_UNCLEAN ||
+ this.runState == TaskStatus.State.KILLED_UNCLEAN));
+ }
public boolean getIncludeCounters() {
return includeCounters;
@@ -261,9 +267,9 @@
/**
* Update the status of the task.
*
+ * @param runstate
* @param progress
* @param state
- * @param phase
* @param counters
*/
synchronized void statusUpdate(State runState,
@@ -300,7 +306,33 @@
this.counters = status.getCounters();
this.outputSize = status.outputSize;
}
-
+
+ /**
+ * Update specific fields of task status
+ *
+ * This update is done in JobTracker when a cleanup attempt of task
+ * reports its status. Then update only specific fields, not all.
+ *
+ * @param runState
+ * @param progress
+ * @param state
+ * @param phase
+ * @param finishTime
+ */
+ synchronized void statusUpdate(State runState,
+ float progress,
+ String state,
+ Phase phase,
+ long finishTime) {
+ setRunState(runState);
+ setProgress(progress);
+ setStateString(state);
+ setPhase(phase);
+ if (finishTime != 0) {
+ this.finishTime = finishTime;
+ }
+ }
+
/**
* Clear out transient information after sending out a status-update
* from either the {@link Task} to the {@link TaskTracker} or from the
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Thu Feb 5 17:39:27 2009
@@ -182,7 +182,8 @@
private static final String SUBDIR = "taskTracker";
private static final String CACHEDIR = "archive";
private static final String JOBCACHE = "jobcache";
- private static final String PIDDIR = "pids";
+ private static final String PID = "pid";
+ private static final String OUTPUT = "output";
private JobConf originalConf;
private JobConf fConf;
private int maxCurrentMapTasks;
@@ -412,10 +413,36 @@
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
}
- static String getPidFilesSubdir() {
- return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR;
+ static String getLocalJobDir(String jobid) {
+ return getJobCacheSubdir() + Path.SEPARATOR + jobid;
}
-
+
+ static String getLocalTaskDir(String jobid, String taskid) {
+ return getLocalTaskDir(jobid, taskid, false) ;
+ }
+
+ static String getIntermediateOutputDir(String jobid, String taskid) {
+ return getLocalTaskDir(jobid, taskid)
+ + Path.SEPARATOR + TaskTracker.OUTPUT ;
+ }
+
+ static String getLocalTaskDir(String jobid,
+ String taskid,
+ boolean isCleanupAttempt) {
+ String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
+ taskDir = taskDir + ".cleanup";
+ }
+ return taskDir;
+ }
+
+ static String getPidFile(String jobid,
+ String taskid,
+ boolean isCleanup) {
+ return getLocalTaskDir(jobid, taskid, isCleanup)
+ + Path.SEPARATOR + PID;
+ }
+
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
@@ -756,9 +783,9 @@
} catch(FileNotFoundException fe) {
jobFileSize = -1;
}
- Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
- + Path.SEPARATOR + jobId
- + Path.SEPARATOR + "job.xml"),
+ Path localJobFile = lDirAlloc.getLocalPathForWrite(
+ getLocalJobDir(jobId.toString())
+ + Path.SEPARATOR + "job.xml",
jobFileSize, fConf);
RunningJob rjob = addTaskToJob(jobId, tip);
synchronized (rjob) {
@@ -782,9 +809,9 @@
// create the 'work' directory
// job-specific shared directory for use as scratch space
- Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
- + Path.SEPARATOR + jobId
- + Path.SEPARATOR + "work"), fConf);
+ Path workDir = lDirAlloc.getLocalPathForWrite(
+ (getLocalJobDir(jobId.toString())
+ + Path.SEPARATOR + "work"), fConf);
if (!localFs.mkdirs(workDir)) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
@@ -806,8 +833,7 @@
// Here we check for and we check five times the size of jarFileSize
// to accommodate for unjarring the jar file in work directory
localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
- getJobCacheSubdir()
- + Path.SEPARATOR + jobId
+ getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "jars",
5 * jarFileSize, fConf), "job.jar");
if (!localFs.mkdirs(localJarFile.getParent())) {
@@ -1224,7 +1250,8 @@
for (TaskStatus taskStatus : status.getTaskReports()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
- taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+ taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+ !taskStatus.inTaskCleanupPhase()) {
if (taskStatus.getIsMap()) {
mapTotal--;
} else {
@@ -1341,7 +1368,8 @@
long now = System.currentTimeMillis();
for (TaskInProgress tip: runningTasks.values()) {
if (tip.getRunState() == TaskStatus.State.RUNNING ||
- tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ tip.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ tip.isCleaningup()) {
// Check the per-job timeout interval for tasks;
// an interval of '0' implies it is never timed-out
long jobTaskTimeout = tip.getTaskTimeout();
@@ -1395,8 +1423,7 @@
// task if the job is done/failed
if (!rjob.keepJobFiles){
directoryCleanupThread.addToQueue(getLocalFiles(fConf,
- SUBDIR + Path.SEPARATOR + JOBCACHE +
- Path.SEPARATOR + rjob.getJobID()));
+ getLocalJobDir(rjob.getJobID().toString())));
}
// Remove this job
rjob.tasks.clear();
@@ -1650,7 +1677,9 @@
}
synchronized (tip) {
//to make sure that there is no kill task action for this
- if (tip.getRunState() != TaskStatus.State.UNASSIGNED) {
+ if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&
+ tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+ tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
//got killed externally while still in the launcher queue
addFreeSlot();
continue;
@@ -1671,7 +1700,8 @@
private TaskInProgress registerTask(LaunchTaskAction action,
TaskLauncher launcher) {
Task t = action.getTask();
- LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
+ LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
+ " task's state:" + t.getState());
TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
synchronized (this) {
tasks.put(t.getTaskID(), tip);
@@ -1693,10 +1723,6 @@
private void startNewTask(TaskInProgress tip) {
try {
localizeJob(tip);
- if (isTaskMemoryManagerEnabled()) {
- taskMemoryManager.addTask(tip.getTask().getTaskID(),
- getVirtualMemoryForTask(tip.getJobConf()));
- }
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
@@ -1717,7 +1743,23 @@
}
}
}
-
+
+ void addToMemoryManager(TaskAttemptID attemptId,
+ JobConf conf,
+ String pidFile) {
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager.addTask(attemptId,
+ getVirtualMemoryForTask(conf), pidFile);
+ }
+ }
+
+ void removeFromMemoryManager(TaskAttemptID attemptId) {
+ // Remove the entry from taskMemoryManagerThread's data structures.
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager.removeTask(attemptId);
+ }
+ }
+
/**
* The server retry loop.
* This while-loop attempts to connect to the JobTracker. It only
@@ -1804,10 +1846,12 @@
localJobConf = null;
taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(),
0.0f,
- TaskStatus.State.UNASSIGNED,
+ task.getState(),
diagnosticInfo.toString(),
"initializing",
getName(),
+ task.isTaskCleanupTask() ?
+ TaskStatus.Phase.CLEANUP :
task.isMapTask()? TaskStatus.Phase.MAP:
TaskStatus.Phase.SHUFFLE,
task.getCounters());
@@ -1817,9 +1861,10 @@
private void localizeTask(Task task) throws IOException{
Path localTaskDir =
- lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() +
- Path.SEPARATOR + task.getJobID() + Path.SEPARATOR +
- task.getTaskID()), defaultJobConf );
+ lDirAlloc.getLocalPathForWrite(
+ TaskTracker.getLocalTaskDir(task.getJobID().toString(),
+ task.getTaskID().toString(), task.isTaskCleanupTask()),
+ defaultJobConf );
FileSystem localFs = FileSystem.getLocal(fConf);
if (!localFs.mkdirs(localTaskDir)) {
@@ -1829,8 +1874,7 @@
// create symlink for ../work if it already doesnt exist
String workDir = lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + task.getJobID()
+ TaskTracker.getLocalJobDir(task.getJobID().toString())
+ Path.SEPARATOR
+ "work", defaultJobConf).toString();
String link = localTaskDir.getParent().toString()
@@ -1841,11 +1885,10 @@
// create the working-directory of the task
Path cwd = lDirAlloc.getLocalPathForWrite(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + task.getJobID()
- + Path.SEPARATOR + task.getTaskID()
- + Path.SEPARATOR + MRConstants.WORKDIR,
- defaultJobConf);
+ getLocalTaskDir(task.getJobID().toString(),
+ task.getTaskID().toString(), task.isTaskCleanupTask())
+ + Path.SEPARATOR + MRConstants.WORKDIR,
+ defaultJobConf);
if (!localFs.mkdirs(cwd)) {
throw new IOException("Mkdirs failed to create "
+ cwd.toString());
@@ -1943,9 +1986,13 @@
* Kick off the task execution
*/
public synchronized void launchTask() throws IOException {
- if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+ if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
+ this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+ this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
localizeTask(task);
- this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+ if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+ this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+ }
this.runner = task.createRunner(TaskTracker.this, this);
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
@@ -1955,6 +2002,10 @@
}
}
+ boolean isCleaningup() {
+ return this.taskStatus.inTaskCleanupPhase();
+ }
+
/**
* The task is reporting its progress
*/
@@ -1962,10 +2013,14 @@
{
LOG.info(task.getTaskID() + " " + taskStatus.getProgress() +
"% " + taskStatus.getStateString());
-
+ // task will report its state as
+ // COMMIT_PENDING when it is waiting for commit response and
+ // when it is committing.
+ // cleanup attempt will report its state as FAILED_UNCLEAN/KILLED_UNCLEAN
if (this.done ||
(this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
- this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) {
+ this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+ !isCleaningup())) {
//make sure we ignore progress messages after a task has
//invoked TaskUmbilicalProtocol.done() or if the task has been
//KILLED/FAILED
@@ -2016,7 +2071,16 @@
* The task is reporting that it's done running
*/
public synchronized void reportDone() {
- this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+ if (isCleaningup()) {
+ if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+ this.taskStatus.setRunState(TaskStatus.State.FAILED);
+ } else if (this.taskStatus.getRunState() ==
+ TaskStatus.State.KILLED_UNCLEAN) {
+ this.taskStatus.setRunState(TaskStatus.State.KILLED);
+ }
+ } else {
+ this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+ }
this.taskStatus.setProgress(1.0f);
this.taskStatus.setFinishTime(System.currentTimeMillis());
this.done = true;
@@ -2031,6 +2095,11 @@
return wasKilled;
}
+ void reportTaskFinished() {
+ taskFinished();
+ releaseSlot();
+ }
+
/**
* The task has actually finished running.
*/
@@ -2057,7 +2126,23 @@
if (!done) {
if (!wasKilled) {
failures += 1;
- taskStatus.setRunState(TaskStatus.State.FAILED);
+ /* State changes:
+ * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED
+ * FAILED_UNCLEAN -> FAILED
+ * KILLED_UNCLEAN -> KILLED
+ */
+ if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.FAILED);
+ } else if (taskStatus.getRunState() ==
+ TaskStatus.State.KILLED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.KILLED);
+ } else if (task.isMapOrReduce() &&
+ taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
+ taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
+ } else {
+ taskStatus.setRunState(TaskStatus.State.FAILED);
+ }
+ removeFromMemoryManager(task.getTaskID());
// call the script here for the failed tasks.
if (debugCommand != null) {
String taskStdout ="";
@@ -2083,9 +2168,10 @@
File workDir = null;
try {
workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + task.getJobID()
- + Path.SEPARATOR + task.getTaskID()
+ TaskTracker.getLocalTaskDir(
+ task.getJobID().toString(),
+ task.getTaskID().toString(),
+ task.isTaskCleanupTask())
+ Path.SEPARATOR + MRConstants.WORKDIR,
localJobConf). toString());
} catch (IOException e) {
@@ -2138,14 +2224,14 @@
LOG.warn("Exception in add diagnostics!");
}
}
- } else {
- taskStatus.setRunState(TaskStatus.State.KILLED);
}
taskStatus.setProgress(0.0f);
}
this.taskStatus.setFinishTime(System.currentTimeMillis());
needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED ||
- taskStatus.getRunState() == TaskStatus.State.KILLED);
+ taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+ taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN ||
+ taskStatus.getRunState() == TaskStatus.State.KILLED);
}
//
@@ -2255,7 +2341,8 @@
synchronized(this){
if (getRunState() == TaskStatus.State.RUNNING ||
getRunState() == TaskStatus.State.UNASSIGNED ||
- getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ isCleaningup()) {
kill(wasFailure);
}
}
@@ -2269,16 +2356,38 @@
* @param wasFailure was it a failure (versus a kill request)?
*/
public synchronized void kill(boolean wasFailure) throws IOException {
+ /* State changes:
+ * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED
+ * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
+ * FAILED_UNCLEAN -> FAILED
+ * KILLED_UNCLEAN -> KILLED
+ * UNASSIGNED -> FAILED/KILLED
+ */
if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
- taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ isCleaningup()) {
wasKilled = true;
if (wasFailure) {
failures += 1;
}
runner.kill();
- taskStatus.setRunState((wasFailure) ?
- TaskStatus.State.FAILED :
- TaskStatus.State.KILLED);
+ if (task.isMapOrReduce()) {
+ taskStatus.setRunState((wasFailure) ?
+ TaskStatus.State.FAILED_UNCLEAN :
+ TaskStatus.State.KILLED_UNCLEAN);
+ } else {
+ // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
+ if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.FAILED);
+ } else if (taskStatus.getRunState() ==
+ TaskStatus.State.KILLED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.KILLED);
+ } else {
+ taskStatus.setRunState((wasFailure) ?
+ TaskStatus.State.FAILED :
+ TaskStatus.State.KILLED);
+ }
+ }
} else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
if (wasFailure) {
failures += 1;
@@ -2287,6 +2396,7 @@
taskStatus.setRunState(TaskStatus.State.KILLED);
}
}
+ removeFromMemoryManager(task.getTaskID());
releaseSlot();
}
@@ -2338,7 +2448,12 @@
synchronized (TaskTracker.this) {
if (needCleanup) {
- tasks.remove(taskId);
+ // see if tasks data structure is holding this tip.
+ // tasks could hold the tip for cleanup attempt, if cleanup attempt
+ // got launched before this method.
+ if (tasks.get(taskId) == this) {
+ tasks.remove(taskId);
+ }
}
synchronized (this){
if (alwaysKeepTaskFiles ||
@@ -2350,8 +2465,8 @@
}
synchronized (this) {
try {
- String taskDir = SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR
- + task.getJobID() + Path.SEPARATOR + taskId;
+ String taskDir = getLocalTaskDir(task.getJobID().toString(),
+ taskId.toString(), task.isTaskCleanupTask());
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
@@ -2572,15 +2687,10 @@
}
if (tip != null) {
if (!commitPending) {
- tip.taskFinished();
- // Remove the entry from taskMemoryManagerThread's data structures.
- if (isTaskMemoryManagerEnabled()) {
- taskMemoryManager.removeTask(taskid);
- }
- tip.releaseSlot();
+ tip.reportTaskFinished();
}
} else {
- LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
+ LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
}
}
@@ -2807,15 +2917,13 @@
// Index file
Path indexFileName = lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir() + Path.SEPARATOR +
- jobId + Path.SEPARATOR +
- mapId + "/output" + "/file.out.index", conf);
+ TaskTracker.getIntermediateOutputDir(jobId, mapId)
+ + "/file.out.index", conf);
// Map-output file
Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir() + Path.SEPARATOR +
- jobId + Path.SEPARATOR +
- mapId + "/output" + "/file.out", conf);
+ TaskTracker.getIntermediateOutputDir(jobId, mapId)
+ + "/file.out", conf);
/**
* Read the index file to get the information about where
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
Thu Feb 5 17:39:27 2009
@@ -250,7 +250,8 @@
TaskStatus.State state = ts.getRunState();
if (ts.getIsMap() &&
((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED))) {
+ (state == TaskStatus.State.UNASSIGNED) ||
+ ts.inTaskCleanupPhase())) {
mapCount++;
}
}
@@ -267,7 +268,8 @@
TaskStatus.State state = ts.getRunState();
if ((!ts.getIsMap()) &&
((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED))) {
+ (state == TaskStatus.State.UNASSIGNED) ||
+ ts.inTaskCleanupPhase())) {
reduceCount++;
}
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
Thu Feb 5 17:39:27 2009
@@ -52,9 +52,10 @@
* encapsulates the events and whether to reset events index.
* Version 13 changed the getTask method signature for HADOOP-249
* Version 14 changed the getTask method signature for HADOOP-4232
+ * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
* */
- public static final long versionID = 14L;
+ public static final long versionID = 15L;
/**
* Called when a child task process starts, to get its task.
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
Thu Feb 5 17:39:27 2009
@@ -174,8 +174,10 @@
@Override
public void abortTask(TaskAttemptContext context) {
try {
- context.progress();
- outputFileSystem.delete(workPath, true);
+ if (workPath != null) {
+ context.progress();
+ outputFileSystem.delete(workPath, true);
+ }
} catch (IOException ie) {
LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
}
Added: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=741203&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java (added)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java Thu
Feb 5 17:39:27 2009
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestTaskFail extends TestCase {
+ public static class MapperClass extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, IntWritable> {
+ String taskid;
+ public void configure(JobConf job) {
+ taskid = job.get("mapred.task.id");
+ }
+ public void map (LongWritable key, Text value,
+ OutputCollector<Text, IntWritable> output,
+ Reporter reporter) throws IOException {
+ if (taskid.endsWith("_0")) {
+ throw new IOException();
+ } else if (taskid.endsWith("_1")) {
+ System.exit(-1);
+ }
+ }
+ }
+
+ public RunningJob launchJob(JobConf conf,
+ Path inDir,
+ Path outDir,
+ String input)
+ throws IOException {
+ // set up the input file system and write input text.
+ FileSystem inFs = inDir.getFileSystem(conf);
+ FileSystem outFs = outDir.getFileSystem(conf);
+ outFs.delete(outDir, true);
+ if (!inFs.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ {
+ // write input into input file
+ DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
+ file.writeBytes(input);
+ file.close();
+ }
+
+ // configure the mapred Job
+ conf.setMapperClass(MapperClass.class);
+ conf.setReducerClass(IdentityReducer.class);
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+ "/tmp")).toString().replace(' ', '+');
+ conf.set("test.build.data", TEST_ROOT_DIR);
+ // return the RunningJob handle.
+ return new JobClient(conf).submitJob(conf);
+ }
+
+ public void testWithDFS() throws IOException {
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fileSys = null;
+ try {
+ final int taskTrackers = 4;
+
+ Configuration conf = new Configuration();
+ dfs = new MiniDFSCluster(conf, 4, true, null);
+ fileSys = dfs.getFileSystem();
+ mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+ JobConf jobConf = mr.createJobConf();
+ final Path inDir = new Path("./input");
+ final Path outDir = new Path("./output");
+ String input = "The quick brown fox\nhas many silly\nred fox sox\n";
+ RunningJob job = null;
+
+ job = launchJob(jobConf, inDir, outDir, input);
+ // wait for the job to finish.
+ while (!job.isComplete());
+ assertEquals(JobStatus.SUCCEEDED, job.getJobState());
+
+ JobID jobId = job.getID();
+ // construct the task id of first map task
+ TaskAttemptID attemptId =
+ new TaskAttemptID(new TaskID(jobId, true, 0), 0);
+ TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
+ getTip(attemptId.getTaskID());
+ // this should not be cleanup attempt since the first attempt
+ // fails with an exception
+ assertTrue(!tip.isCleanupAttempt(attemptId));
+ TaskStatus ts =
+ mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+ assertTrue(ts != null);
+ assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+
+ attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1);
+ // this should be cleanup attempt since the second attempt fails
+ // with System.exit
+ assertTrue(tip.isCleanupAttempt(attemptId));
+ ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+ assertTrue(ts != null);
+ assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+
+ } finally {
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown(); }
+ }
+ }
+
+ public static void main(String[] argv) throws Exception {
+ TestTaskFail td = new TestTaskFail();
+ td.testWithDFS();
+ }
+}
Modified: hadoop/core/branches/branch-0.20/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/webapps/job/taskdetails.jsp?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/branches/branch-0.20/src/webapps/job/taskdetails.jsp Thu Feb 5 17:39:27 2009
@@ -67,13 +67,19 @@
}
}
}
- TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
- : null;
+ TaskInProgress tip = null;
+ if (job != null && tipidObj != null) {
+ tip = job.getTaskInProgress(tipidObj);
+ }
+ TaskStatus[] ts = null;
+ if (tip != null) {
+ ts = tip.getTaskStatuses();
+ }
boolean isCleanupOrSetup = false;
- if (tipidObj != null) {
- isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask();
+ if ( tip != null) {
+ isCleanupOrSetup = tip.isJobCleanupTask();
if (!isCleanupOrSetup) {
- isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask();
+ isCleanupOrSetup = tip.isJobSetupTask();
}
}
%>
@@ -115,14 +121,41 @@
TaskTrackerStatus taskTracker = tracker.getTaskTracker(taskTrackerName);
out.print("<tr><td>" + status.getTaskID() + "</td>");
String taskAttemptTracker = null;
+ String cleanupTrackerName = null;
+ TaskTrackerStatus cleanupTracker = null;
+ String cleanupAttemptTracker = null;
+ boolean hasCleanupAttempt = false;
+ if (tip != null && tip.isCleanupAttempt(status.getTaskID())) {
+ cleanupTrackerName = tip.machineWhereCleanupRan(status.getTaskID());
+ cleanupTracker = tracker.getTaskTracker(cleanupTrackerName);
+ if (cleanupTracker != null) {
+ cleanupAttemptTracker = "http://" + cleanupTracker.getHost() + ":"
+ + cleanupTracker.getHttpPort();
+ }
+ hasCleanupAttempt = true;
+ }
+ out.print("<td>");
+ if (hasCleanupAttempt) {
+ out.print("Task attempt: ");
+ }
if (taskTracker == null) {
- out.print("<td>" + taskTrackerName + "</td>");
+ out.print(taskTrackerName);
} else {
taskAttemptTracker = "http://" + taskTracker.getHost() + ":"
+ taskTracker.getHttpPort();
- out.print("<td><a href=\"" + taskAttemptTracker + "\">"
- + tracker.getNode(taskTracker.getHost()) + "</a></td>");
+ out.print("<a href=\"" + taskAttemptTracker + "\">"
+ + tracker.getNode(taskTracker.getHost()) + "</a>");
+ }
+ if (hasCleanupAttempt) {
+ out.print("<br/>Cleanup Attempt: ");
+ if (cleanupAttemptTracker == null ) {
+ out.print(cleanupTrackerName);
+ } else {
+ out.print("<a href=\"" + cleanupAttemptTracker + "\">"
+ + tracker.getNode(cleanupTracker.getHost()) + "</a>");
}
+ }
+ out.print("</td>");
out.print("<td>" + status.getRunState() + "</td>");
out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2)
+ ServletUtil.percentageGraph(status.getProgress() * 100f, 80) + "</td>");
@@ -162,6 +195,9 @@
String.valueOf(taskTracker.getHttpPort()),
status.getTaskID().toString());
}
+ if (hasCleanupAttempt) {
+ out.print("Task attempt: <br/>");
+ }
if (taskLogUrl == null) {
out.print("n/a");
} else {
@@ -172,6 +208,25 @@
out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
}
+ if (hasCleanupAttempt) {
+ out.print("Cleanup attempt: <br/>");
+ taskLogUrl = null;
+ if (cleanupTracker != null ) {
+ taskLogUrl = TaskLogServlet.getTaskLogUrl(cleanupTracker.getHost(),
+ String.valueOf(cleanupTracker.getHttpPort()),
+ status.getTaskID().toString());
+ }
+ if (taskLogUrl == null) {
+ out.print("n/a");
+ } else {
+ String tailFourKBUrl = taskLogUrl + "&start=-4097&cleanup=true";
+ String tailEightKBUrl = taskLogUrl + "&start=-8193&cleanup=true";
+ String entireLogUrl = taskLogUrl + "&all=true&cleanup=true";
+ out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
+ out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
+ out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
+ }
+ }
out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid
+ "&tipid=" + tipid + "&taskid=" + status.getTaskID() + "\">"
+ ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");
|