hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r692408 [1/2] - in /hadoop/core/trunk: ./ src/docs/src/documentation/content/xdocs/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapps/job/
Date Fri, 05 Sep 2008 10:50:06 GMT
Author: ddas
Date: Fri Sep  5 03:50:04 2008
New Revision: 692408

URL: http://svn.apache.org/viewvc?rev=692408&view=rev
Log:
HADOOP-3150. Moves task promotion to tasks. Defines a new interface for committing output files. Moves job setup to jobclient, and moves jobcleanup to a separate task. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CommitTaskAction.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
    hadoop/core/trunk/src/webapps/job/jobdetails.jsp
    hadoop/core/trunk/src/webapps/job/jobtasks.jsp

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep  5 03:50:04 2008
@@ -47,6 +47,10 @@
     so increment the InterTrackerProtocol version. (Hemanth Yamijala via 
     omalley)
 
+    HADOOP-3150. Moves task promotion to tasks. Defines a new interface for
+    committing output files. Moves job setup to jobclient, and moves jobcleanup
+    to a separate task. (Amareshwari Sriramadasu via ddas)
+
   NEW FEATURES
 
     HADOOP-3341. Allow streaming jobs to specify the field separator for map

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Fri Sep  5 03:50:04 2008
@@ -715,7 +715,8 @@
       <p>We will then discuss other core interfaces including 
       <code>JobConf</code>, <code>JobClient</code>, <code>Partitioner</code>, 
       <code>OutputCollector</code>, <code>Reporter</code>, 
-      <code>InputFormat</code>, <code>OutputFormat</code> and others.</p>
+      <code>InputFormat</code>, <code>OutputFormat</code>,
+      <code>OutputCommitter</code> and others.</p>
       
       <p>Finally, we will wrap up by discussing some useful features of the
       framework such as the <code>DistributedCache</code>, 
@@ -1013,8 +1014,9 @@
  
         <p><code>JobConf</code> is typically used to specify the 
         <code>Mapper</code>, combiner (if any), <code>Partitioner</code>, 
-        <code>Reducer</code>, <code>InputFormat</code> and 
-        <code>OutputFormat</code> implementations. <code>JobConf</code> also 
+        <code>Reducer</code>, <code>InputFormat</code>, 
+        <code>OutputFormat</code> and <code>OutputCommitter</code> 
+        implementations. <code>JobConf</code> also 
         indicates the set of input files 
         (<a href="ext:api/org/apache/hadoop/mapred/fileinputformat/setinputpaths">setInputPaths(JobConf, Path...)</a>
         /<a href="ext:api/org/apache/hadoop/mapred/fileinputformat/addinputpath">addInputPath(JobConf, Path)</a>)
@@ -1428,6 +1430,45 @@
  
         <p><code>TextOutputFormat</code> is the default 
         <code>OutputFormat</code>.</p>
+
+        <section>
+        <title>OutputCommitter</title>
+        
+        <p><a href="ext:api/org/apache/hadoop/mapred/outputcommitter">
+        OutputCommitter</a> describes the commit of task output for a 
+        Map/Reduce job.</p>
+
+        <p>The Map/Reduce framework relies on the <code>OutputCommitter</code>
+        of the job to:</p>
+        <ol>
+          <li>
+            Setup the job during initialization. For example, create
+            the temporary output directory for the job during the
+            initialization of the job. The job client does the setup
+            for the job.
+          </li>
+          <li>
+            Cleanup the job after the job completion. For example, remove the
+            temporary output directory after the job completion. A separate 
+            task does the cleanupJob at the end of the job.
+          </li>
+          <li>
+            Setup the task temporary output.
+          </li> 
+          <li>
+            Check whether a task needs a commit. This is to avoid the commit
+            procedure if a task does not need commit.
+          </li>
+          <li>
+            Commit of the task output. 
+          </li> 
+          <li>
+            Discard the task commit.
+          </li>
+        </ol>
+        <p><code>FileOutputCommitter</code> is the default 
+        <code>OutputCommitter</code>.</p>
+        </section>
  
         <section>
           <title>Task Side-Effect Files</title>
@@ -1443,7 +1484,9 @@
           (using the attemptid, say <code>attempt_200709221812_0001_m_000000_0</code>), 
           not just per task.</p> 
  
-          <p>To avoid these issues the Map/Reduce framework maintains a special 
+          <p>To avoid these issues the Map/Reduce framework, when the 
+          <code>OutputCommitter</code> is <code>FileOutputCommitter</code>, 
+          maintains a special 
           <code>${mapred.output.dir}/_temporary/_${taskid}</code> sub-directory
           accessible via <code>${mapred.work.output.dir}</code>
           for each task-attempt on the <code>FileSystem</code> where the output

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml Fri Sep  5 03:50:04 2008
@@ -190,6 +190,7 @@
               <outputcollector href="OutputCollector.html">
                 <collect href="#collect(K, V)" />
               </outputcollector>
+              <outputcommitter href="OutputCommitter.html" />
               <outputformat href="OutputFormat.html" />
               <outputlogfilter href="OutputLogFilter.html" />
               <sequencefileoutputformat href="SequenceFileOutputFormat.html">

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CommitTaskAction.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CommitTaskAction.java?rev=692408&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CommitTaskAction.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CommitTaskAction.java Fri Sep  5 03:50:04 2008
@@ -0,0 +1,53 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hadoop.mapred.JobTracker} 
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to commit the output
+ * of the task.
+ * 
+ */
+class CommitTaskAction extends TaskTrackerAction {
+  private TaskAttemptID taskId;
+  
+  public CommitTaskAction() {
+    super(ActionType.COMMIT_TASK);
+  }
+  
+  public CommitTaskAction(TaskAttemptID taskId) {
+    super(ActionType.COMMIT_TASK);
+    this.taskId = taskId;
+  }
+  
+  public TaskAttemptID getTaskID() {
+    return taskId;
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    taskId = TaskAttemptID.read(in);
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=692408&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java Fri Sep  5 03:50:04 2008
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+
+/** An {@link OutputCommitter} that commits files specified 
+ * in job output directory i.e. ${mapred.output.dir}. 
+ **/
+public class FileOutputCommitter extends OutputCommitter {
+
+  public static final Log LOG = LogFactory.getLog(
+      "org.apache.hadoop.mapred.FileOutputCommitter");
+/**
+   * Temporary directory name 
+   */
+  public static final String TEMP_DIR_NAME = "_temporary";
+
+  public void setupJob(JobContext context) throws IOException {
+    JobConf conf = context.getJobConf();
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+      FileSystem fileSys = tmpDir.getFileSystem(conf);
+      if (!fileSys.mkdirs(tmpDir)) {
+        LOG.error("Mkdirs failed to create " + tmpDir.toString());
+      }
+    }
+  }
+
+  public void cleanupJob(JobContext context) throws IOException {
+    JobConf conf = context.getJobConf();
+    // do the clean up of temporary directory
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+      FileSystem fileSys = tmpDir.getFileSystem(conf);
+      context.getProgressible().progress();
+      if (fileSys.exists(tmpDir)) {
+        fileSys.delete(tmpDir, true);
+      }
+    }
+  }
+
+  public void setupTask(TaskAttemptContext context) throws IOException {
+    // FileOutputCommitter's setupTask doesn't do anything. Because the
+    // temporary task directory is created on demand when the 
+    // task is writing.
+  }
+		  
+  public void commitTask(TaskAttemptContext context) 
+  throws IOException {
+    Path taskOutputPath = getTempTaskOutputPath(context);
+    TaskAttemptID attemptId = context.getTaskAttemptID();
+    JobConf job = context.getJobConf();
+    if (taskOutputPath != null) {
+      FileSystem fs = taskOutputPath.getFileSystem(job);
+      context.getProgressible().progress();
+      if (fs.exists(taskOutputPath)) {
+        Path jobOutputPath = taskOutputPath.getParent().getParent();
+        // Move the task outputs to their final place
+        moveTaskOutputs(context, fs, jobOutputPath, taskOutputPath);
+        // Delete the temporary task-specific output directory
+        if (!fs.delete(taskOutputPath, true)) {
+          LOG.info("Failed to delete the temporary output" + 
+          " directory of task: " + attemptId + " - " + taskOutputPath);
+        }
+        LOG.info("Saved output of task '" + attemptId + "' to " + 
+                 jobOutputPath);
+      }
+    }
+  }
+		  
+  private void moveTaskOutputs(TaskAttemptContext context,
+                               FileSystem fs,
+                               Path jobOutputDir,
+                               Path taskOutput) 
+  throws IOException {
+    TaskAttemptID attemptId = context.getTaskAttemptID();
+    context.getProgressible().progress();
+    if (fs.isFile(taskOutput)) {
+      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
+                                          getTempTaskOutputPath(context));
+      if (!fs.rename(taskOutput, finalOutputPath)) {
+        if (!fs.delete(finalOutputPath, true)) {
+          throw new IOException("Failed to delete earlier output of task: " + 
+                                 attemptId);
+        }
+        if (!fs.rename(taskOutput, finalOutputPath)) {
+          throw new IOException("Failed to save output of task: " + 
+        		  attemptId);
+        }
+      }
+      LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
+    } else if(fs.getFileStatus(taskOutput).isDir()) {
+      FileStatus[] paths = fs.listStatus(taskOutput);
+      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
+	          getTempTaskOutputPath(context));
+      fs.mkdirs(finalOutputPath);
+      if (paths != null) {
+        for (FileStatus path : paths) {
+          moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
+        }
+      }
+    }
+  }
+
+  public void abortTask(TaskAttemptContext context) {
+    Path taskOutputPath =  getTempTaskOutputPath(context);
+    try {
+      FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
+      context.getProgressible().progress();
+      fs.delete(taskOutputPath, true);
+    } catch (IOException ie) {
+      LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
+    }
+  }
+
+  private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
+                            Path taskOutputPath) {
+    URI relativePath = taskOutputPath.toUri().relativize(taskOutput.toUri());
+    if (relativePath.getPath().length() > 0) {
+      return new Path(jobOutputDir, relativePath.getPath());
+    } else {
+      return jobOutputDir;
+    }
+  }
+
+  public boolean needsTaskCommit(TaskAttemptContext context) 
+  throws IOException {
+    try {
+      Path taskOutputPath = getTempTaskOutputPath(context);
+      if (taskOutputPath != null) {
+        context.getProgressible().progress();
+        // Get the file-system for the task output directory
+        FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
+        // since task output path is created on demand, 
+        // if it exists, task needs a commit
+        if (fs.exists(taskOutputPath)) {
+          return true;
+        }
+      }
+    } catch (IOException  ioe) {
+      throw ioe;
+    }
+    return false;
+  }
+
+  Path getTempTaskOutputPath(TaskAttemptContext taskContext) {
+    JobConf conf = taskContext.getJobConf();
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      Path p = new Path(outputPath,
+                     (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+                      "_" + taskContext.getTaskAttemptID().toString()));
+      try {
+        FileSystem fs = p.getFileSystem(conf);
+        return p.makeQualified(fs);
+      } catch (IOException ie) {
+        LOG.warn(StringUtils .stringifyException(ie));
+        return p;
+      }
+    }
+    return null;
+  }
+  
+  Path getWorkPath(TaskAttemptContext taskContext, Path basePath) 
+  throws IOException {
+    // ${mapred.out.dir}/_temporary
+    Path jobTmpDir = new Path(basePath, FileOutputCommitter.TEMP_DIR_NAME);
+    FileSystem fs = jobTmpDir.getFileSystem(taskContext.getJobConf());
+    if (!fs.exists(jobTmpDir)) {
+      throw new IOException("The temporary job-output directory " + 
+          jobTmpDir.toString() + " doesn't exist!"); 
+    }
+    // ${mapred.out.dir}/_temporary/_${taskid}
+    String taskid = taskContext.getTaskAttemptID().toString();
+    Path taskTmpDir = new Path(jobTmpDir, "_" + taskid);
+    if (!fs.mkdirs(taskTmpDir)) {
+      throw new IOException("Mkdirs failed to create " 
+          + taskTmpDir.toString());
+    }
+    return taskTmpDir;
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java Fri Sep  5 03:50:04 2008
@@ -152,6 +152,12 @@
    *  
    * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4>
    * 
+   * <p><i>Note:</i> The following is valid only if the {@link OutputCommitter}
+   *  is {@link FileOutputCommitter}. If <code>OutputCommitter</code> is not 
+   *  a <code>FileOutputCommitter</code>, the task's temporary output
+   *  directory is same as {@link #getOutputPath(JobConf)} i.e.
+   *  <tt>${mapred.output.dir}$</tt></p>
+   *  
    * <p>Some applications need to create/write-to side-files, which differ from
    * the actual job-outputs.
    * 
@@ -207,29 +213,23 @@
    */
   protected static Path getTaskOutputPath(JobConf conf, String name) 
   throws IOException {
-    // ${mapred.job.dir}
+    // ${mapred.out.dir}
     Path outputPath = getOutputPath(conf);
     if (outputPath == null) {
       throw new IOException("Undefined job output-path");
     }
 
-    // ${mapred.out.dir}/_temporary
-    Path jobTmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
-    FileSystem fs = jobTmpDir.getFileSystem(conf);
-    if (!fs.exists(jobTmpDir)) {
-      throw new IOException("The temporary job-output directory " + 
-          jobTmpDir.toString() + " doesn't exist!"); 
-    }
-
-    // ${mapred.out.dir}/_temporary/_${taskid}
-    Path taskTmpDir = getWorkOutputPath(conf);
-    if (!fs.mkdirs(taskTmpDir)) {
-      throw new IOException("Mkdirs failed to create " 
-          + taskTmpDir.toString());
+    OutputCommitter committer = conf.getOutputCommitter();
+    Path workPath = outputPath;
+    TaskAttemptContext context = new TaskAttemptContext(conf,
+                TaskAttemptID.forName(conf.get("mapred.task.id")));
+    if (committer instanceof FileOutputCommitter) {
+      workPath = ((FileOutputCommitter)committer).getWorkPath(context,
+                                                              outputPath);
     }
     
     // ${mapred.out.dir}/_temporary/_${taskid}/${name}
-    return new Path(taskTmpDir, name);
+    return new Path(workPath, name);
   } 
 
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Sep  5 03:50:04 2008
@@ -46,8 +46,9 @@
    * Version 14: replaced getFilesystemName with getSystemDir for HADOOP-3135
    * Version 15: Changed format of Task and TaskStatus for HADOOP-153
    * Version 16: adds ResourceStatus to TaskTrackerStatus for HADOOP-3759
+   * Version 17: Changed format of Task and TaskStatus for HADOOP-3150
    */
-  public static final long versionID = 16L;
+  public static final long versionID = 17L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Sep  5 03:50:04 2008
@@ -46,7 +46,7 @@
       return TaskUmbilicalProtocol.versionID;
     }
     
-    public void done(TaskAttemptID taskid, boolean shouldPromote) throws IOException {
+    public void done(TaskAttemptID taskid) throws IOException {
       LOG.info("Task " + taskid + " reporting done.");
     }
 
@@ -66,6 +66,15 @@
       return true;
     }
 
+    public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) 
+    throws IOException, InterruptedException {
+      statusUpdate(taskId, taskStatus);
+    }
+    
+    public boolean canCommit(TaskAttemptID taskid) throws IOException {
+      return true;
+    }
+    
     public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
     throws IOException, InterruptedException {
       StringBuffer buf = new StringBuffer("Task ");

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Sep  5 03:50:04 2008
@@ -253,6 +253,15 @@
     }
 
     /**
+     * A float between 0.0 and 1.0, indicating the % of cleanup work
+     * completed.
+     */
+    public float cleanupProgress() throws IOException {
+      ensureFreshStatus();
+      return status.cleanupProgress();
+    }
+
+    /**
      * Returns immediately whether the whole job is done yet or not.
      */
     public synchronized boolean isComplete() throws IOException {
@@ -786,6 +795,13 @@
       out.close();
     }
 
+    // skip doing setup if there are no maps for the job.
+    // because if there are no maps, job is considered completed and successful
+    if (splits.length != 0) {
+      // do setupJob
+      job.getOutputCommitter().setupJob(new JobContext(job));
+    }
+
     //
     // Now, actually submit the job (using the submit name)
     //
@@ -967,7 +983,18 @@
   public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
     return jobSubmitClient.getReduceTaskReports(jobId);
   }
-   
+
+  /**
+   * Get the information of the current state of the cleanup tasks of a job.
+   * 
+   * @param jobId the job to query.
+   * @return the list of all of the cleanup tips.
+   * @throws IOException
+   */    
+  public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
+    return jobSubmitClient.getCleanupTaskReports(jobId);
+  }
+  
   /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
   @Deprecated
   public TaskReport[] getReduceTaskReports(String jobId) throws IOException {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Sep  5 03:50:04 2008
@@ -360,6 +360,28 @@
                                                               OutputFormat.class),
                                                      this);
   }
+
+  /**
+   * Get the {@link OutputCommitter} implementation for the map-reduce job,
+   * defaults to {@link FileOutputCommitter} if not specified explicitly.
+   * 
+   * @return the {@link OutputCommitter} implementation for the map-reduce job.
+   */
+  public OutputCommitter getOutputCommitter() {
+    return (OutputCommitter)ReflectionUtils.newInstance(
+      getClass("mapred.output.committer.class", FileOutputCommitter.class,
+               OutputCommitter.class), this);
+  }
+
+  /**
+   * Set the {@link OutputCommitter} implementation for the map-reduce job.
+   * 
+   * @param theClass the {@link OutputCommitter} implementation for the map-reduce 
+   *                 job.
+   */
+  public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
+    setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
+  }
   
   /**
    * Set the {@link OutputFormat} implementation for the map-reduce job.

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobContext.java?rev=692408&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobContext.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobContext.java Fri Sep  5 03:50:04 2008
@@ -0,0 +1,53 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.util.Progressable;
+
+public class JobContext {
+
+  JobConf job;
+  private Progressable progress;
+
+  JobContext(JobConf conf, Progressable progress) {
+    job = conf;
+    this.progress = progress;
+  }
+
+  JobContext(JobConf conf) {
+    this(conf, Reporter.NULL);
+  }
+  
+  /**
+   * Get the job Configuration
+   * 
+   * @return JobConf
+   */
+  public JobConf getJobConf() {
+    return job;
+  }
+  
+  /**
+   * Get the progress mechanism for reporting progress.
+   * 
+   * @return progress mechanism 
+   */
+  public Progressable getProgressible() {
+    return progress;
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Sep  5 03:50:04 2008
@@ -61,6 +61,7 @@
 
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
+  TaskInProgress cleanup[] = new TaskInProgress[0];
   int numMapTasks = 0;
   int numReduceTasks = 0;
   
@@ -76,6 +77,8 @@
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
   int failedReduceTIPs = 0;
+  private volatile boolean launchedCleanup = false;
+  private volatile boolean jobKilled = false;
 
   JobPriority priority = JobPriority.NORMAL;
   JobTracker jobtracker = null;
@@ -361,6 +364,7 @@
       this.finishTime = System.currentTimeMillis();
       status.setMapProgress(1.0f);
       status.setReduceProgress(1.0f);
+      status.setCleanupProgress(1.0f);
       status.setRunState(JobStatus.SUCCEEDED);
       tasksInited.set(true);
       JobHistory.JobInfo.logStarted(profile.getJobID(), 
@@ -385,15 +389,18 @@
       nonRunningReduces.add(reduces[i]);
     }
 
-    // create job specific temporary directory in output path
-    Path outputPath = FileOutputFormat.getOutputPath(conf);
-    if (outputPath != null) {
-      Path tmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
-      FileSystem fileSys = tmpDir.getFileSystem(conf);
-      if (!fileSys.mkdirs(tmpDir)) {
-        LOG.error("Mkdirs failed to create " + tmpDir.toString());
-      }
-    }
+    // create cleanup two cleanup tips, one map and one reduce.
+    cleanup = new TaskInProgress[2];
+    // cleanup map tip. This map is doesn't use split. 
+    // Just assign splits[0]
+    cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
+            jobtracker, conf, this, numMapTasks);
+    cleanup[0].setCleanupTask();
+
+    // cleanup reduce tip.
+    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+                       numReduceTasks, jobtracker, conf, this);
+    cleanup[1].setCleanupTask();
 
     this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, JobStatus.RUNNING);
     tasksInited.set(true);
@@ -463,6 +470,14 @@
   }
     
   /**
+   * Get the list of cleanup tasks
+   * @return the array of cleanup tasks for the job
+   */
+  TaskInProgress[] getCleanupTasks() {
+    return cleanup;
+  }
+  
+  /**
    * Get the list of reduce tasks
    * @return the raw array of reduce tasks for this job
    */
@@ -481,7 +496,7 @@
   /**
    * Return a vector of completed TaskInProgress objects
    */
-  public Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
+  public synchronized Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
                                                       boolean shouldBeComplete) {
     
     Vector<TaskInProgress> results = new Vector<TaskInProgress>();
@@ -498,6 +513,21 @@
     }
     return results;
   }
+  
+  /**
+   * Return a vector of cleanup TaskInProgress objects
+   */
+  public synchronized Vector<TaskInProgress> reportCleanupTIPs(
+                                               boolean shouldBeComplete) {
+    
+    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
+    for (int i = 0; i < cleanup.length; i++) {
+      if (cleanup[i].isComplete() == shouldBeComplete) {
+        results.add(cleanup[i]);
+      }
+    }
+    return results;
+  }
 
   ////////////////////////////////////////////////////
   // Status update methods
@@ -509,6 +539,9 @@
     double oldProgress = tip.getProgress();   // save old progress
     boolean wasRunning = tip.isRunning();
     boolean wasComplete = tip.isComplete();
+    boolean wasPending = tip.isOnlyCommitPending();
+    TaskAttemptID taskid = status.getTaskID();
+    
     // If the TIP is already completed and the task reports as SUCCEEDED then 
     // mark the task as KILLED.
     // In case of task with no promotion the task tracker will mark the task 
@@ -523,13 +556,6 @@
         this.jobtracker.getTaskTracker(status.getTaskTracker());
       String httpTaskLogLocation = null; 
 
-      if (state == TaskStatus.State.COMMIT_PENDING) {
-        JobWithTaskContext j = new JobWithTaskContext(this, tip, 
-                                                      status.getTaskID(),
-                                                      metrics);
-        jobtracker.addToCommitQueue(j);
-      }
-
       if (null != ttStatus){
         String host;
         if (NetUtils.getStaticResolution(ttStatus.getHost()) != null) {
@@ -545,7 +571,7 @@
       if (state == TaskStatus.State.SUCCEEDED) {
         taskEvent = new TaskCompletionEvent(
                                             taskCompletionEventTracker, 
-                                            status.getTaskID(),
+                                            taskid,
                                             tip.idWithinJob(),
                                             status.getIsMap(),
                                             TaskCompletionEvent.Status.SUCCEEDED,
@@ -554,27 +580,29 @@
         taskEvent.setTaskRunTime((int)(status.getFinishTime() 
                                        - status.getStartTime()));
         tip.setSuccessEventNumber(taskCompletionEventTracker); 
-      }
-      //For a failed task update the JT datastructures.For the task state where
-      //only the COMMIT is pending, delegate everything to the JT thread. For
-      //failed tasks we want the JT to schedule a reexecution ASAP (and not go
-      //via the queue for the datastructures' updates).
-      else if (state == TaskStatus.State.COMMIT_PENDING) {
+      } else if (state == TaskStatus.State.COMMIT_PENDING) {
+        // If it is the first attempt reporting COMMIT_PENDING
+        // ask the task to commit.
+        if (!wasComplete && !wasPending) {
+          tip.doCommit(taskid);
+        }
         return;
-      } else if (state == TaskStatus.State.FAILED ||
-                 state == TaskStatus.State.KILLED) {
+      }
+      //For a failed task update the JT datastructures. 
+      else if (state == TaskStatus.State.FAILED ||
+               state == TaskStatus.State.KILLED) {
         // Get the event number for the (possibly) previously successful
         // task. If there exists one, then set that status to OBSOLETE 
         int eventNumber;
         if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
           TaskCompletionEvent t = 
             this.taskCompletionEvents.get(eventNumber);
-          if (t.getTaskAttemptId().equals(status.getTaskID()))
+          if (t.getTaskAttemptId().equals(taskid))
             t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
         }
         
         // Tell the job to fail the relevant task
-        failedTask(tip, status.getTaskID(), status, ttStatus,
+        failedTask(tip, taskid, status, ttStatus,
                    wasRunning, wasComplete, metrics);
 
         // Did the task failure lead to tip failure?
@@ -586,7 +614,7 @@
           taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
         }
         taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
-                                            status.getTaskID(),
+                                            taskid,
                                             tip.idWithinJob(),
                                             status.getIsMap(),
                                             taskCompletionStatus, 
@@ -616,21 +644,24 @@
       LOG.debug("Taking progress for " + tip.getTIPId() + " from " + 
                  oldProgress + " to " + tip.getProgress());
     }
-    double progressDelta = tip.getProgress() - oldProgress;
-    if (tip.isMapTask()) {
-      if (maps.length == 0) {
-        this.status.setMapProgress(1.0f);
-      } else {
-        this.status.setMapProgress((float) (this.status.mapProgress() +
-                                            progressDelta / maps.length));
-      }
-    } else {
-      if (reduces.length == 0) {
-        this.status.setReduceProgress(1.0f);
+    
+    if (!tip.isCleanupTask()) {
+      double progressDelta = tip.getProgress() - oldProgress;
+      if (tip.isMapTask()) {
+        if (maps.length == 0) {
+          this.status.setMapProgress(1.0f);
+        } else {
+          this.status.setMapProgress((float) (this.status.mapProgress() +
+                                              progressDelta / maps.length));
+        }
       } else {
-        this.status.setReduceProgress
-          ((float) (this.status.reduceProgress() +
-                    (progressDelta / reduces.length)));
+        if (reduces.length == 0) {
+          this.status.setReduceProgress(1.0f);
+        } else {
+          this.status.setReduceProgress
+            ((float) (this.status.reduceProgress() +
+                      (progressDelta / reduces.length)));
+        }
       }
     }
   }
@@ -662,7 +693,7 @@
    *  Returns the total job counters, by adding together the job, 
    *  the map and the reduce counters.
    */
-  public Counters getCounters() {
+  public synchronized Counters getCounters() {
     Counters result = new Counters();
     result.incrAllCounters(getJobCounters());
     incrementTaskCounters(result, maps);
@@ -720,6 +751,77 @@
   }    
 
   /**
+   * Return a CleanupTask, if appropriate, to run on the given tasktracker
+   * 
+   */
+  public synchronized Task obtainCleanupTask(TaskTrackerStatus tts, 
+                                             int clusterSize, 
+                                             int numUniqueHosts,
+                                             boolean isMapSlot
+                                            ) throws IOException {
+    if (!canLaunchCleanupTask()) {
+      return null;
+    }
+    
+    String taskTracker = tts.getTrackerName();
+    // Update the last-known clusterSize
+    this.clusterSize = clusterSize;
+    if (!shouldRunOnTaskTracker(taskTracker)) {
+      return null;
+    }
+    
+    List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
+    if (isMapSlot) {
+      cleanupTaskList.add(cleanup[0]);
+    } else {
+      cleanupTaskList.add(cleanup[1]);
+    }
+    TaskInProgress tip = findTaskFromList(cleanupTaskList,
+                           tts, numUniqueHosts, false);
+    if (tip == null) {
+      return null;
+    }
+    
+    // Now launch the cleanupTask
+    Task result = tip.getTaskToRun(tts.getTrackerName());
+    if (result != null) {
+      launchedCleanup = true;
+      if (tip.isFirstAttempt(result.getTaskID())) {
+        JobHistory.Task.logStarted(tip.getTIPId(), 
+          tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(),
+          System.currentTimeMillis(), tip.getSplitNodes());
+      }
+    }
+    return result;
+  }
+  
+  /**
+   * Check whether cleanup task can be launched for the job.
+   * 
+   * Cleanup task can be launched if it is not already launched
+   * or job is Killed
+   * or all maps and reduces are complete
+   * @return true/false
+   */
+  private synchronized boolean canLaunchCleanupTask() {
+    if (launchedCleanup) {
+      return false;
+    }
+    // check if job has failed or killed
+    if (jobKilled) {
+      return true;
+    }
+    // Check if all maps and reducers have finished.
+    boolean launchCleanupTask = 
+        ((finishedMapTasks + failedMapTIPs) == (numMapTasks));
+    if (launchCleanupTask) {
+      launchCleanupTask = 
+        ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
+    }
+    return launchCleanupTask;
+  }
+  
+  /**
    * Return a ReduceTask, if appropriate, to run on the given tasktracker.
    * We don't have cache-sensitivity for reduce tasks, as they
    *  work on temporary MapRed files.  
@@ -1408,27 +1510,45 @@
     }
         
     // Update the running/finished map/reduce counts
-    if (tip.isMapTask()){
-      runningMapTasks -= 1;
-      finishedMapTasks += 1;
-      metrics.completeMap(taskid);
-      // remove the completed map from the resp running caches
-      retireMap(tip);
-    } else{
-      runningReduceTasks -= 1;
-      finishedReduceTasks += 1;
-      metrics.completeReduce(taskid);
-      // remove the completed reduces from the running reducers set
-      retireReduce(tip);
-    }
-        
-    //
-    // Figure out whether the Job is done
-    //
-    isJobComplete(tip, metrics);
-    
-    if (this.status.getRunState() != JobStatus.RUNNING) {
-      // The job has been killed/failed, 
+    if (!tip.isCleanupTask()) {
+      if (tip.isMapTask()) {
+        runningMapTasks -= 1;
+        finishedMapTasks += 1;
+        metrics.completeMap(taskid);
+        // remove the completed map from the resp running caches
+        retireMap(tip);
+        if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
+          this.status.setMapProgress(1.0f);
+        }
+      } else {
+        runningReduceTasks -= 1;
+        finishedReduceTasks += 1;
+        metrics.completeReduce(taskid);
+        // remove the completed reduces from the running reducers set
+        retireReduce(tip);
+        if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
+          this.status.setReduceProgress(1.0f);
+        }
+      }
+    } else {
+      // cleanup task has finished. Kill the extra cleanup tip
+      if (tip.isMapTask()) {
+        // kill the reduce tip
+        cleanup[1].kill();
+      } else {
+        cleanup[0].kill();
+      }
+      //
+      // The Job is done
+      //
+      // if the job is killed, then mark the job failed.
+      if (jobKilled) {
+        killJob();
+      }
+      else {
+        jobComplete(metrics);
+      }
+      // The job has been killed/failed/successful
       // JobTracker should cleanup this task
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
       return false;
@@ -1438,31 +1558,18 @@
   }
 
   /**
-   * Check if the job is done since all it's component tasks are either
+   * The job is done since all it's component tasks are either
    * successful or have failed.
    * 
-   * @param tip the current tip which completed either succesfully or failed
    * @param metrics job-tracker metrics
-   * @return
    */
-  private boolean isJobComplete(TaskInProgress tip, JobTrackerInstrumentation metrics) {
-    // Job is complete if total-tips = finished-tips + failed-tips
-    boolean allDone = 
-      ((finishedMapTasks + failedMapTIPs) == numMapTasks);
-    if (allDone) {
-      if (tip.isMapTask()) {
-        this.status.setMapProgress(1.0f);              
-      }
-      allDone = 
-        ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
-    }
-
+  private void jobComplete(JobTrackerInstrumentation metrics) {
     //
-    // If all tasks are complete, then the job is done!
+    // All tasks are complete, then the job is done!
     //
-    if (this.status.getRunState() == JobStatus.RUNNING && allDone) {
+    if (this.status.getRunState() == JobStatus.RUNNING ) {
       this.status.setRunState(JobStatus.SUCCEEDED);
-      this.status.setReduceProgress(1.0f);
+      this.status.setCleanupProgress(1.0f);
       this.finishTime = System.currentTimeMillis();
       garbageCollect();
       LOG.info("Job " + this.status.getJobID() + 
@@ -1472,11 +1579,21 @@
                                      this.finishedReduceTasks, failedMapTasks, 
                                      failedReduceTasks, getCounters());
       metrics.completeJob();
-      return true;
     }
-    
-    return false;
   }
+  
+  private synchronized void killJob() {
+    if ((status.getRunState() == JobStatus.RUNNING) ||
+        (status.getRunState() == JobStatus.PREP)) {
+      this.status = new JobStatus(status.getJobID(),
+                          1.0f, 1.0f, 1.0f, JobStatus.FAILED);
+      this.finishTime = System.currentTimeMillis();
+      JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
+              this.finishedMapTasks, this.finishedReduceTasks);
+      garbageCollect();
+    }
+  }
+
   /**
    * Kill the job and all its component tasks.
    */
@@ -1484,8 +1601,6 @@
     if ((status.getRunState() == JobStatus.RUNNING) ||
          (status.getRunState() == JobStatus.PREP)) {
       LOG.info("Killing job '" + this.status.getJobID() + "'");
-      this.status = new JobStatus(status.getJobID(), 1.0f, 1.0f, JobStatus.FAILED);
-      this.finishTime = System.currentTimeMillis();
       this.runningMapTasks = 0;
       this.runningReduceTasks = 0;
       //
@@ -1497,9 +1612,7 @@
       for (int i = 0; i < reduces.length; i++) {
         reduces[i].kill();
       }
-      JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
-                                   this.finishedMapTasks, this.finishedReduceTasks);
-      garbageCollect();
+      jobKilled = true;
     }
   }
 
@@ -1531,7 +1644,9 @@
         
     //update running  count on task failure.
     if (wasRunning && !isRunning) {
-      if (tip.isMapTask()){
+      if (tip.isCleanupTask()) {
+        launchedCleanup = false;
+      } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
@@ -1592,10 +1707,12 @@
         
     // After this, try to assign tasks with the one after this, so that
     // the failed task goes to the end of the list.
-    if (tip.isMapTask()) {
-      failedMapTasks++; 
-    } else {
-      failedReduceTasks++; 
+    if (!tip.isCleanupTask()) {
+      if (tip.isMapTask()) {
+        failedMapTasks++;
+      } else {
+        failedReduceTasks++; 
+      }
     }
             
     //
@@ -1620,8 +1737,8 @@
       // Allow upto 'mapFailuresPercent' of map tasks to fail or
       // 'reduceFailuresPercent' of reduce tasks to fail
       //
-      boolean killJob = 
-        tip.isMapTask() ? 
+      boolean killJob = tip.isCleanupTask() ? true :
+                        tip.isMapTask() ? 
             ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
             ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
       
@@ -1638,18 +1755,28 @@
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks
                                     );
-        kill();
-      } else {
-        isJobComplete(tip, metrics);
+        if (tip.isCleanupTask()) {
+          // kill the other tip
+          if (tip.isMapTask()) {
+            cleanup[1].kill();
+          } else {
+            cleanup[0].kill();
+          }
+          killJob();
+        } else {
+          kill();
+        }
       }
       
       //
       // Update the counters
       //
-      if (tip.isMapTask()) {
-        jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
-      } else {
-        jobCounters.incrCounter(Counter.NUM_FAILED_REDUCES, 1);
+      if (!tip.isCleanupTask()) {
+        if (tip.isMapTask()) {
+          jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
+        } else {
+          jobCounters.incrCounter(Counter.NUM_FAILED_REDUCES, 1);
+        }
       }
     }
   }
@@ -1711,16 +1838,6 @@
       Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
       FileSystem fs = tempDir.getFileSystem(conf);
       fs.delete(tempDir, true); 
-
-      // delete the temporary directory in output directory
-      Path outputPath = FileOutputFormat.getOutputPath(conf);
-      if (outputPath != null) {
-        Path tmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
-        FileSystem fileSys = tmpDir.getFileSystem(conf);
-        if (fileSys.exists(tmpDir)) {
-          fileSys.delete(tmpDir, true);
-        }
-      }
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }
@@ -1736,8 +1853,13 @@
   /**
    * Return the TaskInProgress that matches the tipid.
    */
-  public TaskInProgress getTaskInProgress(TaskID tipid){
+  public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
     if (tipid.isMap()) {
+      for (int i = 0; i < cleanup.length; i++) {
+        if (tipid.equals(cleanup[i].getTIPId())){
+          return cleanup[i];
+        }
+      }
       for (int i = 0; i < maps.length; i++) {
         if (tipid.equals(maps[i].getTIPId())){
           return maps[i];
@@ -1758,7 +1880,7 @@
    * @param mapId the id of the map
    * @return the task status of the completed task
    */
-  public TaskStatus findFinishedMap(int mapId) {
+  public synchronized TaskStatus findFinishedMap(int mapId) {
     TaskInProgress tip = maps[mapId];
     if (tip.isComplete()) {
       TaskStatus[] statuses = tip.getTaskStatuses();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Fri Sep  5 03:50:04 2008
@@ -73,20 +73,68 @@
                                  0.01f);
   }
 
+  protected Task getCleanupTask(int numMaps, int numReduces,
+                                int maxMapTasks, int maxReduceTasks,
+                                TaskTrackerStatus taskTracker,
+                                int numTaskTrackers,
+                                Collection<JobInProgress> jobQueue) 
+  throws IOException {
+    Task t = null;
+    if (numMaps < maxMapTasks) {
+      for (JobInProgress job : jobQueue) {
+        t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+                       taskTrackerManager.getNumberOfUniqueHosts(), true);
+        if (t != null) {
+          return t;
+        }
+      }
+    }
+    if (numReduces < maxReduceTasks) {
+      for (JobInProgress job : jobQueue) {
+        t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+                       taskTrackerManager.getNumberOfUniqueHosts(), false);
+        if (t != null) {
+          return t;
+        }
+      }
+    }
+    return t;
+  }
+  
   @Override
   public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
       throws IOException {
-    //
-    // Compute average map and reduce task numbers across pool
-    //
-    int remainingReduceLoad = 0;
-    int remainingMapLoad = 0;
 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     int numTaskTrackers = clusterStatus.getTaskTrackers();
 
     Collection<JobInProgress> jobQueue =
       jobQueueJobInProgressListener.getJobQueue();
+
+    //
+    // Get map + reduce counts for the current tracker.
+    //
+    int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
+    int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
+    int numMaps = taskTracker.countMapTasks();
+    int numReduces = taskTracker.countReduceTasks();
+
+
+    // cleanup task has the highest priority, it should be 
+    // launched as soon as the job is done.
+    synchronized (jobQueue) {
+      Task t = getCleanupTask(numMaps, numReduces, maxCurrentMapTasks,
+                 maxCurrentReduceTasks, taskTracker, numTaskTrackers, jobQueue);
+      if (t != null) {
+        return Collections.singletonList(t);
+      }
+    }
+
+    //
+    // Compute average map and reduce task numbers across pool
+    //
+    int remainingReduceLoad = 0;
+    int remainingMapLoad = 0;
     synchronized (jobQueue) {
       for (JobInProgress job : jobQueue) {
         if (job.getStatus().getRunState() == JobStatus.RUNNING) {
@@ -98,8 +146,6 @@
       }
     }
 
-    int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
-    int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
     // find out the maximum number of maps or reduces that we are willing
     // to run on any node.
     int maxMapLoad = 0;
@@ -113,13 +159,6 @@
                                                / numTaskTrackers));
     }
         
-    //
-    // Get map + reduce counts for the current tracker.
-    //
-
-    int numMaps = taskTracker.countMapTasks();
-    int numReduces = taskTracker.countReduceTasks();
-    
     int totalMaps = clusterStatus.getMapTasks();
     int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();
     int totalReduces = clusterStatus.getReduceTasks();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java Fri Sep  5 03:50:04 2008
@@ -49,6 +49,7 @@
   private JobID jobid;
   private float mapProgress;
   private float reduceProgress;
+  private float cleanupProgress;
   private int runState;
   private long startTime;
   private String user;
@@ -63,17 +64,32 @@
    * @param jobid The jobid of the job
    * @param mapProgress The progress made on the maps
    * @param reduceProgress The progress made on the reduces
+   * @param cleanupProgress The progress made on cleanup
    * @param runState The current state of the job
    */
-  public JobStatus(JobID jobid, float mapProgress, float reduceProgress, int runState) {
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+                   float cleanupProgress, int runState) {
     this.jobid = jobid;
     this.mapProgress = mapProgress;
     this.reduceProgress = reduceProgress;
+    this.cleanupProgress = cleanupProgress;
     this.runState = runState;
     this.user = "nobody";
   }
 
   /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param runState The current state of the job
+   */
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+                   int runState) {
+    this(jobid, mapProgress, reduceProgress, 0.0f, runState);
+  }
+
+  /**
    * @deprecated use getJobID instead
    */
   @Deprecated
@@ -96,8 +112,21 @@
   synchronized void setMapProgress(float p) { 
     this.mapProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
   }
+
+  /**
+   * @return Percentage of progress in cleanup 
+   */
+  public synchronized float cleanupProgress() { return cleanupProgress; }
     
   /**
+   * Sets the cleanup progress of this job
+   * @param p The value of cleanup progress to set to
+   */
+  synchronized void setCleanupProgress(float p) { 
+    this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
+  }
+
+  /**
    * @return Percentage of progress in reduce 
    */
   public synchronized float reduceProgress() { return reduceProgress; }
@@ -150,6 +179,7 @@
     jobid.write(out);
     out.writeFloat(mapProgress);
     out.writeFloat(reduceProgress);
+    out.writeFloat(cleanupProgress);
     out.writeInt(runState);
     out.writeLong(startTime);
     Text.writeString(out, user);
@@ -159,6 +189,7 @@
     this.jobid = JobID.read(in);
     this.mapProgress = in.readFloat();
     this.reduceProgress = in.readFloat();
+    this.cleanupProgress = in.readFloat();
     this.runState = in.readInt();
     this.startTime = in.readLong();
     this.user = Text.readString(in);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Sep  5 03:50:04 2008
@@ -41,8 +41,10 @@
    * Version 9: change the counter representation for HADOOP-1915
    * Version 10: added getSystemDir for HADOOP-3135
    * Version 11: changed JobProfile to include the queue name for HADOOP-3698
+   * Version 12: Added getCleanupTaskReports and 
+   *             cleanupProgress to JobStatus as part of HADOOP-3150
    */
-  public static final long versionID = 11L;
+  public static final long versionID = 12L;
 
   /**
    * Allocate a name for the job.
@@ -105,6 +107,11 @@
   public TaskReport[] getReduceTaskReports(JobID jobid) throws IOException;
 
   /**
+   * Grab a bunch of info on the cleanup tasks that make up the job
+   */
+  public TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException;
+
+  /**
    * A MapReduce system always operates on a single filesystem.  This 
    * function returns the fs name.  ('local' if the localfs; 'addr:port' 
    * if dfs).  The client can then copy files into the right locations 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Sep  5 03:50:04 2008
@@ -41,7 +41,6 @@
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -62,7 +61,6 @@
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlIOException;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -512,8 +510,6 @@
   Path systemDir = null;
   private JobConf conf;
 
-  private Thread taskCommitThread;
-  
   private QueueManager queueManager;
 
   /**
@@ -693,8 +689,6 @@
     this.retireJobsThread.start();
     taskScheduler.start();
     expireLaunchingTaskThread.start();
-    this.taskCommitThread = new TaskCommitQueue();
-    this.taskCommitThread.start();
 
     if (completedJobStatusStore.isActive()) {
       completedJobsStoreThread = new Thread(completedJobStatusStore,
@@ -749,15 +743,6 @@
         ex.printStackTrace();
       }
     }
-    if (this.taskCommitThread != null) {
-      LOG.info("Stopping TaskCommit thread");
-      this.taskCommitThread.interrupt();
-      try {
-        this.taskCommitThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
     if (this.completedJobsStoreThread != null &&
         this.completedJobsStoreThread.isAlive()) {
       LOG.info("Stopping completedJobsStore thread");
@@ -1258,6 +1243,12 @@
       actions.addAll(killTasksList);
     }
      
+    // Check for tasks whose outputs can be saved
+    List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
+    if (commitTasksList != null) {
+      actions.addAll(commitTasksList);
+    }
+
     // calculate next heartbeat interval and put in heartbeat response
     int nextInterval = getNextHeartbeatInterval();
     response.setHeartbeatInterval(nextInterval);
@@ -1443,6 +1434,30 @@
   }
 
   /**
+   * A tracker wants to know if any of its Tasks can be committed 
+   */
+  private synchronized List<TaskTrackerAction> getTasksToSave(
+                                                 TaskTrackerStatus tts) {
+    List<TaskStatus> taskStatuses = tts.getTaskReports();
+    if (taskStatuses != null) {
+      List<TaskTrackerAction> saveList = new ArrayList<TaskTrackerAction>();
+      for (TaskStatus taskStatus : taskStatuses) {
+        if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+          TaskAttemptID taskId = taskStatus.getTaskID();
+          TaskInProgress tip = taskidToTIPMap.get(taskId);
+          if (tip.shouldCommit(taskId)) {
+            saveList.add(new CommitTaskAction(taskId));
+            LOG.debug(tts.getTrackerName() + 
+                      " -> CommitTaskAction: " + taskId);
+          }
+        }
+      }
+      return saveList;
+    }
+    return null;
+  }
+  
+  /**
    * Grab the local fs name
    */
   public synchronized String getFilesystemName() throws IOException {
@@ -1622,7 +1637,30 @@
       return reports.toArray(new TaskReport[reports.size()]);
     }
   }
-    
+
+  public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
+    JobInProgress job = jobs.get(jobid);
+    if (job == null) {
+      return new TaskReport[0];
+    } else {
+      Vector<TaskReport> reports = new Vector<TaskReport>();
+      Vector<TaskInProgress> completeTasks = job.reportCleanupTIPs(true);
+      for (Iterator<TaskInProgress> it = completeTasks.iterator();
+           it.hasNext();) {
+        TaskInProgress tip = (TaskInProgress) it.next();
+        reports.add(tip.generateSingleReport());
+      }
+      Vector<TaskInProgress> incompleteTasks = job.reportCleanupTIPs(false);
+      for (Iterator<TaskInProgress> it = incompleteTasks.iterator(); 
+           it.hasNext();) {
+        TaskInProgress tip = (TaskInProgress) it.next();
+        reports.add(tip.generateSingleReport());
+      }
+      return reports.toArray(new TaskReport[reports.size()]);
+    }
+  
+  }
+  
   TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
   
   /* 
@@ -1886,184 +1924,6 @@
       removeMarkedTasks(trackerName);
     }
   }
-
-  /**
-   * Add a job's completed task (either successful or failed/killed) to the 
-   * {@link TaskCommitQueue}. 
-   * @param j completed task (either successful or failed/killed)
-   */
-  void addToCommitQueue(JobInProgress.JobWithTaskContext j) {
-    ((TaskCommitQueue)taskCommitThread).addToQueue(j);
-  }
-  
-  /**
-   * A thread which does all of the {@link FileSystem}-related operations for
-   * tasks. It picks the next task in the queue, promotes outputs of 
-   * {@link TaskStatus.State#SUCCEEDED} tasks & discards outputs for 
-   * {@link TaskStatus.State#FAILED} or {@link TaskStatus.State#KILLED} tasks.
-   */
-  private class TaskCommitQueue extends Thread {
-    
-    private LinkedBlockingQueue<JobInProgress.JobWithTaskContext> queue = 
-            new LinkedBlockingQueue <JobInProgress.JobWithTaskContext>();
-        
-    public TaskCommitQueue() {
-      setName("Task Commit Thread");
-      setDaemon(true);
-    }
-    
-    public void addToQueue(JobInProgress.JobWithTaskContext j) {
-      while (true) { // loop until the element gets added
-        try {
-          queue.put(j);
-          return;
-        } catch (InterruptedException ie) {}
-      }
-    }
-       
-    @Override
-    public void run() {
-      int  batchCommitSize = conf.getInt("jobtracker.task.commit.batch.size", 
-                                         5000); 
-      while (!isInterrupted()) {
-        try {
-          ArrayList <JobInProgress.JobWithTaskContext> jobList = 
-            new ArrayList<JobInProgress.JobWithTaskContext>(batchCommitSize);
-          // Block if the queue is empty
-          jobList.add(queue.take());  
-          queue.drainTo(jobList, batchCommitSize);
-
-          JobInProgress[] jobs = new JobInProgress[jobList.size()];
-          TaskInProgress[] tips = new TaskInProgress[jobList.size()];
-          TaskAttemptID[] taskids = new TaskAttemptID[jobList.size()];
-          JobTrackerInstrumentation[] metrics = new JobTrackerInstrumentation[jobList.size()];
-
-          Iterator<JobInProgress.JobWithTaskContext> iter = jobList.iterator();
-          int count = 0;
-
-          while (iter.hasNext()) {
-            JobInProgress.JobWithTaskContext j = iter.next();
-            jobs[count] = j.getJob();
-            tips[count] = j.getTIP();
-            taskids[count]= j.getTaskID();
-            metrics[count] = j.getJobTrackerMetrics();
-            ++count;
-          }
-
-          Task[] tasks = new Task[jobList.size()];
-          TaskStatus[] status = new TaskStatus[jobList.size()];
-          boolean[] isTipComplete = new boolean[jobList.size()];
-          TaskStatus.State[] states = new TaskStatus.State[jobList.size()];
-
-          synchronized (JobTracker.this) {
-            for(int i = 0; i < jobList.size(); ++i) {
-              synchronized (jobs[i]) {
-                synchronized (tips[i]) {
-                  status[i] = tips[i].getTaskStatus(taskids[i]);
-                  tasks[i] = tips[i].getTask(taskids[i]);
-                  states[i] = status[i].getRunState();
-                  isTipComplete[i] = tips[i].isComplete();
-                }
-              }
-            }
-          }
-
-          //For COMMIT_PENDING tasks, we save the task output in the dfs
-          //as well as manipulate the JT datastructures to reflect a
-          //successful task. This guarantees that we don't declare a task
-          //as having succeeded until we have successfully completed the
-          //dfs operations.
-          //For failed tasks, we just do the dfs operations here. The
-          //datastructures updates is done earlier as soon as the failure
-          //is detected so that the JT can immediately schedule another
-          //attempt for that task.
-
-          Set<TaskID> seenTIPs = new HashSet<TaskID>();
-          for(int index = 0; index < jobList.size(); ++index) {
-            try {
-              if (states[index] == TaskStatus.State.COMMIT_PENDING) {
-                if (!isTipComplete[index]) {
-                  if (!seenTIPs.contains(tips[index].getTIPId())) {
-                    tasks[index].saveTaskOutput();
-                    seenTIPs.add(tips[index].getTIPId());
-                  } else {
-                    // since other task of this tip has saved its output
-                    isTipComplete[index] = true;
-                  }
-                }
-              }
-            } catch (IOException ioe) {
-              // Oops! Failed to copy the task's output to its final place;
-              // fail the task!
-              states[index] = TaskStatus.State.FAILED;
-              synchronized (JobTracker.this) {
-                String reason = "Failed to rename output with the exception: " 
-                                + StringUtils.stringifyException(ioe);
-                TaskStatus.Phase phase = (tips[index].isMapTask() 
-                                          ? TaskStatus.Phase.MAP 
-                                          : TaskStatus.Phase.REDUCE);
-                jobs[index].failedTask(tips[index], status[index].getTaskID(), 
-                                       reason, phase, TaskStatus.State.FAILED, 
-                                       status[index].getTaskTracker(), null);
-              }
-              LOG.info("Failed to rename the output of " 
-                       + status[index].getTaskID() + " with " 
-                       + StringUtils.stringifyException(ioe));
-            }
-          }
-
-          synchronized (JobTracker.this) {
-            //do a check for the case where after the task went to
-            //COMMIT_PENDING, it was lost. So although we would have
-            //saved the task output, we cannot declare it a SUCCESS.
-            for(int i = 0; i < jobList.size(); ++i) {
-              TaskStatus newStatus = null;
-              if(states[i] == TaskStatus.State.COMMIT_PENDING) {
-                synchronized (jobs[i]) {
-                  synchronized (tips[i]) {
-                    status[i] = tips[i].getTaskStatus(taskids[i]);
-                    if (!isTipComplete[i]) {
-                      if (status[i].getRunState() 
-                          != TaskStatus.State.COMMIT_PENDING) {
-                        states[i] = TaskStatus.State.KILLED;
-                      } else {
-                        states[i] = TaskStatus.State.SUCCEEDED;
-                      }
-                    } else {
-                      tips[i].addDiagnosticInfo(tasks[i].getTaskID(), 
-                                                "Already completed  TIP");
-                      states[i] = TaskStatus.State.KILLED;
-                    }
-                    //create new status if required. If the state changed 
-                    //from COMMIT_PENDING to KILLED in the JobTracker, while 
-                    //we were saving the output,the JT would have called 
-                    //updateTaskStatus and we don't need to call it again
-                    newStatus = (TaskStatus)status[i].clone();
-                    newStatus.setRunState(states[i]);
-                    newStatus.setProgress(
-                        (states[i] == TaskStatus.State.SUCCEEDED) 
-                        ? 1.0f 
-                        : 0.0f);
-                  }
-                  if (newStatus != null) {
-                    jobs[i].updateTaskStatus(tips[i], newStatus, metrics[i]);
-                  }
-                }
-              }
-            }
-          }
-        } catch (InterruptedException ie) {
-          break;
-        }
-        catch (Throwable t) {
-          LOG.error(getName() + " got an exception: " +
-                    StringUtils.stringifyException(t));
-        }
-      }
-      
-      LOG.warn(getName() + " exiting..."); 
-    }
-  }
   
 
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java Fri Sep  5 03:50:04 2008
@@ -62,17 +62,32 @@
   public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
       throws IOException {
 
+    final int numTaskTrackers =
+        taskTrackerManager.getClusterStatus().getTaskTrackers();
+    Collection<JobInProgress> jobQueue =
+      jobQueueJobInProgressListener.getJobQueue();
+    Task task;
+
     /* Stats about the current taskTracker */
     final int mapTasksNumber = taskTracker.countMapTasks();
     final int reduceTasksNumber = taskTracker.countReduceTasks();
     final int maximumMapTasksNumber = taskTracker.getMaxMapTasks();
     final int maximumReduceTasksNumber = taskTracker.getMaxReduceTasks();
+
+    // check if cleanup task can be launched
+    synchronized (jobQueue) {
+      task = getCleanupTask(mapTasksNumber, reduceTasksNumber,
+               maximumMapTasksNumber, maximumReduceTasksNumber,
+               taskTracker, numTaskTrackers, jobQueue);
+      if (task != null) {
+        return Collections.singletonList(task);
+      }
+    }
+
     /*
      * Statistics about the whole cluster. Most are approximate because of
      * concurrency
      */
-    final int numTaskTrackers =
-      taskTrackerManager.getClusterStatus().getTaskTrackers();
     final int[] maxMapAndReduceLoad = getMaxMapAndReduceLoad(
         maximumMapTasksNumber, maximumReduceTasksNumber);
     final int maximumMapLoad = maxMapAndReduceLoad[0];
@@ -112,11 +127,8 @@
         continue;
       }
       /* For each job, start its tasks */
-      Collection<JobInProgress> jobQueue =
-        jobQueueJobInProgressListener.getJobQueue();
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
-          Task task;
           /* Ignore non running jobs */
           if (job.getStatus().getRunState() != JobStatus.RUNNING) {
             continue;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Sep  5 03:50:04 2008
@@ -111,18 +111,9 @@
           numReduceTasks = 1;
           job.setNumReduceTasks(1);
         }
-        // create job specific temp directory in output path
-        Path outputPath = FileOutputFormat.getOutputPath(job);
-        FileSystem outputFs = null;
-        Path tmpDir = null;
-        if (outputPath != null) {
-          tmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
-          outputFs = tmpDir.getFileSystem(job);
-          if (!outputFs.mkdirs(tmpDir)) {
-            LOG.error("Mkdirs failed to create " + tmpDir.toString());
-          }
-        }
-
+        JobContext jContext = new JobContext(conf);
+        OutputCommitter outputCommitter = job.getOutputCommitter();
+        
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
           TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i), 0);  
@@ -136,25 +127,12 @@
                                     splits[i].getClass().getName(),
                                     split);
           JobConf localConf = new JobConf(job);
-          if (outputFs != null) {
-            if (outputFs.exists(tmpDir)) {
-              Path taskTmpDir = new Path(tmpDir, "_" + mapId);
-              if (!outputFs.mkdirs(taskTmpDir)) {
-                throw new IOException("Mkdirs failed to create " 
-                                       + taskTmpDir.toString());
-              }
-            } else {
-              throw new IOException("The directory " + tmpDir.toString()
-                                   + " doesnt exist " );
-            }
-          }
           map.setJobFile(localFile.toString());
           map.localizeConfiguration(localConf);
           map.setConf(localConf);
           map_tasks += 1;
           myMetrics.launchMap(mapId);
           map.run(localConf, this);
-          map.saveTaskOutput();
           myMetrics.completeMap(mapId);
           map_tasks -= 1;
           updateCounters(map);
@@ -180,25 +158,12 @@
               ReduceTask reduce = new ReduceTask(file.toString(), 
                                                  reduceId, 0, mapIds.size());
               JobConf localConf = new JobConf(job);
-              if (outputFs != null) {
-                if (outputFs.exists(tmpDir)) {
-                  Path taskTmpDir = new Path(tmpDir, "_" + reduceId);
-                  if (!outputFs.mkdirs(taskTmpDir)) {
-                    throw new IOException("Mkdirs failed to create " 
-                                           + taskTmpDir.toString());
-                  }
-                } else {
-                  throw new IOException("The directory " + tmpDir.toString()
-                                       + " doesnt exist ");
-                }
-              }
               reduce.setJobFile(localFile.toString());
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce_tasks += 1;
               myMetrics.launchReduce(reduce.getTaskID());
               reduce.run(localConf, this);
-              reduce.saveTaskOutput();
               myMetrics.completeReduce(reduce.getTaskID());
               reduce_tasks -= 1;
               updateCounters(reduce);
@@ -213,15 +178,8 @@
           }
         }
         // delete the temporary directory in output directory
-        try {
-          if (outputFs != null) {
-            if (outputFs.exists(tmpDir)) {
-              outputFs.delete(tmpDir, true);
-            }
-          }
-        } catch (IOException e) {
-          LOG.error("Exception in deleting " + tmpDir.toString());
-        }
+        outputCommitter.cleanupJob(jContext);
+        status.setCleanupProgress(1.0f);
 
         this.status.setRunState(JobStatus.SUCCEEDED);
 
@@ -265,6 +223,16 @@
     }
 
     /**
+     * Task is reporting that it is in commit_pending
+     * and it is waiting for the commit Response
+     */
+    public void commitPending(TaskAttemptID taskid,
+                              TaskStatus taskStatus) 
+    throws IOException, InterruptedException {
+      statusUpdate(taskid, taskStatus);
+    }
+
+    /**
      * Updates counters corresponding to completed tasks.
      * @param task A map or reduce task which has just been 
      * successfully completed
@@ -285,8 +253,13 @@
     public boolean ping(TaskAttemptID taskid) throws IOException {
       return true;
     }
-
-    public void done(TaskAttemptID taskId, boolean shouldPromote) throws IOException {
+    
+    public boolean canCommit(TaskAttemptID taskid) 
+    throws IOException {
+      return true;
+    }
+    
+    public void done(TaskAttemptID taskId) throws IOException {
       int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
         status.setMapProgress(1.0f);
@@ -352,6 +325,9 @@
   public TaskReport[] getReduceTaskReports(JobID id) {
     return new TaskReport[0];
   }
+  public TaskReport[] getCleanupTaskReports(JobID id) {
+    return new TaskReport[0];
+  }
 
   public JobStatus getJobStatus(JobID id) {
     Job job = jobs.get(id);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java Fri Sep  5 03:50:04 2008
@@ -61,10 +61,5 @@
    */
   public static final String RAW_MAP_OUTPUT_LENGTH = "Raw-Map-Output-Length";
 
-  /**
-   * Temporary directory name 
-   */
-  public static final String TEMP_DIR_NAME = "_temporary";
-  
   public static final String WORKDIR = "work";
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Sep  5 03:50:04 2008
@@ -31,7 +31,6 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -266,6 +265,13 @@
     // start thread that will handle communication with parent
     startCommunicationThread(umbilical);
 
+    initialize(job, reporter);
+    // check if it is a cleanupJobTask
+    if (cleanupJob) {
+      runCleanup(umbilical);
+      return;
+    }
+
     int numReduceTasks = conf.getNumReduceTasks();
     LOG.info("numReduceTasks: " + numReduceTasks);
     MapOutputCollector collector = null;

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java?rev=692408&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java Fri Sep  5 03:50:04 2008
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * <code>OutputCommitter</code> describes the commit of task output for a 
+ * Map-Reduce job.
+ *
+ * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 
+ * the job to:<p>
+ * <ol>
+ *   <li>
+ *   Setup the job during initialization. For example, create the temporary 
+ *   output directory for the job during the initialization of the job.
+ *   The job client does the setup for the job.
+ *   </li>
+ *   <li>
+ *   Cleanup the job after the job completion. For example, remove the
+ *   temporary output directory after the job completion. CleanupJob is done
+ *   by a separate task at the end of the job.
+ *   </li>
+ *   <li>
+ *   Setup the task temporary output.
+ *   </li> 
+ *   <li>
+ *   Check whether a task needs a commit. This is to avoid the commit
+ *   procedure if a task does not need commit.
+ *   </li>
+ *   <li>
+ *   Commit of the task output.
+ *   </li>  
+ *   <li>
+ *   Discard the task commit.
+ *   </li>
+ * </ol>
+ * 
+ * @see FileOutputCommitter 
+ * @see JobContext
+ * @see TaskAttemptContext 
+ *
+ */
+public abstract class OutputCommitter {
+  /**
+   * For the framework to setup the job output during initialization
+   * 
+   * The job client does the setup for the job.
+   *   
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException if temporary output could not be created
+   */
+  public abstract void setupJob(JobContext jobContext) throws IOException;
+
+  /**
+   * For cleaning up the job's output after job completion
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException
+   */
+  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+
+  /**
+   * Sets up output for the task.
+   * 
+   * @param taskContext Context of the task whose output is being written.
+   * @throws IOException
+   */
+  public abstract void setupTask(TaskAttemptContext taskContext)
+  throws IOException;
+  
+  /**
+   * Check whether task needs a commit
+   * 
+   * @param taskContext
+   * @return true/false
+   * @throws IOException
+   */
+  public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
+  throws IOException;
+
+  /**
+   * To promote the task's temporary output to final output location
+   * 
+   * The task's output is moved to the job's output directory.
+   * 
+   * @param taskContext Context of the task whose output is being written.
+   * @throws IOException if commit is not 
+   */
+  public abstract void commitTask(TaskAttemptContext taskContext)
+  throws IOException;
+  
+  /**
+   * Discard the task output
+   * 
+   * @param taskContext
+   * @throws IOException
+   */
+  public abstract void abortTask(TaskAttemptContext taskContext)
+  throws IOException;
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Sep  5 03:50:04 2008
@@ -69,7 +69,6 @@
 import org.apache.hadoop.mapred.IFile.*;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
-import org.apache.hadoop.mapred.Task.Counter;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -326,7 +325,15 @@
 
     // start thread that will handle communication with parent
     startCommunicationThread(umbilical);
+    final Reporter reporter = getReporter(umbilical);
+    initialize(job, reporter);
 
+    // check if it is a cleanupJobTask
+    if (cleanupJob) {
+      runCleanup(umbilical);
+      return;
+    }
+    
     FileSystem lfs = FileSystem.getLocal(job);
     
     // Initialize the codec
@@ -350,7 +357,6 @@
  
     setPhase(TaskStatus.Phase.SORT); 
 
-    final Reporter reporter = getReporter(umbilical);
     
     // sort the input file
     LOG.info("Initiating final on-disk merge with " + mapFiles.length + 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java Fri Sep  5 03:50:04 2008
@@ -85,6 +85,15 @@
   public float reduceProgress() throws IOException;
 
   /**
+   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
+   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's cleanup-tasks.
+   * @throws IOException
+   */
+  public float cleanupProgress() throws IOException;
+
+  /**
    * Check if the job is finished or not. 
    * This is a non-blocking call.
    * 



Mime
View raw message