hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1179191 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/jav...
Date Wed, 05 Oct 2011 12:17:47 GMT
Author: vinodkv
Date: Wed Oct  5 12:17:46 2011
New Revision: 1179191

URL: http://svn.apache.org/viewvc?rev=1179191&view=rev
Log:
MAPREDUCE-2702. svn merge -c r1179188 --ignore-ancestry ../../trunk/

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/
      - copied from r1179188, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/
      - copied from r1179188, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
      - copied unchanged from r1179188, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1179191&r1=1179190&r2=1179191&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Oct  5 12:17:46
2011
@@ -315,6 +315,10 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2907. Changed log level for various messages in ResourceManager
     from INFO to DEBUG. (Ravi Prakash via vinodkv)
 
+    MAPREDUCE-2702. Added a new API in OutputCommitter for recovering
+    the outputs of tasks from a crashed job so as to support MR Application
+    Master recovery. (Sharad Agarwal and Arun C Murthy via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1179191&r1=1179190&r2=1179191&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
Wed Oct  5 12:17:46 2011
@@ -473,4 +473,6 @@ public interface MRJobConfig {
   public static final String MAPREDUCE_V2_CHILD_CLASS = 
       "org.apache.hadoop.mapred.YarnChild";
 
+  public static final String APPLICATION_ATTEMPT_ID =
+      "mapreduce.job.application.attempt.id";
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=1179191&r1=1179190&r2=1179191&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
Wed Oct  5 12:17:46 2011
@@ -143,4 +143,35 @@ public abstract class OutputCommitter {
    */
   public abstract void abortTask(TaskAttemptContext taskContext)
   throws IOException;
+
+  /**
+   * Is task output recovery supported for restarting jobs?
+   * 
+   * If task output recovery is supported, job restart can be done more 
+   * efficiently.
+   * 
+   * @return <code>true</code> if task output recovery is supported,
+   *         <code>false</code> otherwise
+   * @see #recoverTask(TaskAttemptContext)         
+   */
+  public boolean isRecoverySupported() {
+    return false;
+  }
+  
+  /**
+   * Recover the task output. 
+   * 
+   * The retry-count for the job will be passed via the 
+   * {@link MRJobConfig#APPLICATION_ATTEMPT_ID} key in  
+   * {@link TaskAttemptContext#getConfiguration()} for the 
+   * <code>OutputCommitter</code>.
+   * 
+   * If an exception is thrown the task will be attempted again. 
+   * 
+   * @param taskContext Context of the task whose output is being recovered
+   * @throws IOException
+   */
+  public void recoverTask(TaskAttemptContext taskContext)
+  throws IOException
+  {}
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1179191&r1=1179190&r2=1179191&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
Wed Oct  5 12:17:46 2011
@@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.StringUtils;
 
 /** An {@link OutputCommitter} that commits files specified 
  * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 
@@ -69,9 +68,8 @@ public class FileOutputCommitter extends
       this.outputPath = outputPath;
       outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
       workPath = new Path(outputPath,
-                          (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
-                           "_" + context.getTaskAttemptID().toString()
-                           )).makeQualified(outputFileSystem);
+                          getTaskAttemptBaseDirName(context))
+                          .makeQualified(outputFileSystem);
     }
   }
 
@@ -82,7 +80,8 @@ public class FileOutputCommitter extends
    */
   public void setupJob(JobContext context) throws IOException {
     if (outputPath != null) {
-      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+      Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + 
+    		  Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
       FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
       if (!fileSys.mkdirs(tmpDir)) {
         LOG.error("Mkdirs failed to create " + tmpDir.toString());
@@ -106,11 +105,27 @@ public class FileOutputCommitter extends
   }
   
   /**
+   * Move all job output to the final place.
    * Delete the temporary directory, including all of the work directories.
    * Create a _SUCCESS file to make it as successful.
    * @param context the job's context
    */
   public void commitJob(JobContext context) throws IOException {
+    //delete the task temp directory from the current jobtempdir
+    Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
+        Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
+    FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
+    if (fileSys.exists(tmpDir)) {
+      fileSys.delete(tmpDir, true);
+    } else {
+      LOG.warn("Task temp dir could not be deleted " + tmpDir);
+    }
+    
+	  //move the job output to final place
+    Path jobOutputPath = 
+        new Path(outputPath, getJobAttemptBaseDirName(context));
+	  moveJobOutputs(outputFileSystem, outputPath, jobOutputPath);
+	  
     // delete the _temporary folder and create a _done file in the o/p folder
     cleanupJob(context);
     if (shouldMarkOutputDir(context.getConfiguration())) {
@@ -118,6 +133,31 @@ public class FileOutputCommitter extends
     }
   }
 
+  private void moveJobOutputs(FileSystem fs,
+      Path finalOutputDir, Path jobOutput) throws IOException {
+    if (fs.isFile(jobOutput)) {
+      Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+      if (!fs.rename(jobOutput, finalOutputPath)) {
+        if (!fs.delete(finalOutputPath, true)) {
+          throw new IOException("Failed to delete earlier output of job");
+        }
+        if (!fs.rename(jobOutput, finalOutputPath)) {
+          throw new IOException("Failed to save output of job");
+        }
+      }
+      LOG.debug("Moved " + jobOutput + " to " + finalOutputPath);
+    } else if (fs.getFileStatus(jobOutput).isDirectory()) {
+      FileStatus[] paths = fs.listStatus(jobOutput);
+      Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+      fs.mkdirs(finalOutputPath);
+      if (paths != null) {
+        for (FileStatus path : paths) {
+          moveJobOutputs(fs, finalOutputDir, path.getPath());
+        }
+      }
+    }
+  }
+
   @Override
   @Deprecated
   public void cleanupJob(JobContext context) throws IOException {
@@ -163,8 +203,10 @@ public class FileOutputCommitter extends
     if (workPath != null) {
       context.progress();
       if (outputFileSystem.exists(workPath)) {
-        // Move the task outputs to their final place
-        moveTaskOutputs(context, outputFileSystem, outputPath, workPath);
+        // Move the task outputs to the current job attempt output dir
+    	  Path jobOutputPath = 
+    	      new Path(outputPath, getJobAttemptBaseDirName(context));
+        moveTaskOutputs(context, outputFileSystem, jobOutputPath, workPath);
         // Delete the temporary task-specific output directory
         if (!outputFileSystem.delete(workPath, true)) {
           LOG.warn("Failed to delete the temporary output" + 
@@ -271,4 +313,50 @@ public class FileOutputCommitter extends
   public Path getWorkPath() throws IOException {
     return workPath;
   }
+
+  @Override
+  public boolean isRecoverySupported() {
+    return true;
+  }
+  
+  @Override
+  public void recoverTask(TaskAttemptContext context)
+      throws IOException {
+    context.progress();
+    Path jobOutputPath = 
+        new Path(outputPath, getJobAttemptBaseDirName(context));
+    int previousAttempt =         
+        context.getConfiguration().getInt(
+            MRJobConfig.APPLICATION_ATTEMPT_ID, 0) - 1;
+    if (previousAttempt < 0) {
+      throw new IOException ("Cannot recover task output for first attempt...");
+    }
+
+    Path pathToRecover = 
+        new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
+    if (outputFileSystem.exists(pathToRecover)) {
+      // Move the task outputs to their final place
+      moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover);
+      LOG.info("Saved output of job to " + jobOutputPath);
+    }
+  }
+
+  protected static String getJobAttemptBaseDirName(JobContext context) {
+    int appAttemptId = 
+        context.getConfiguration().getInt(
+            MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+    return getJobAttemptBaseDirName(appAttemptId);
+  }
+
+  protected static String getJobAttemptBaseDirName(int appAttemptId) {
+    return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + 
+      + appAttemptId;
+  }
+
+  protected static String getTaskAttemptBaseDirName(
+      TaskAttemptContext context) {
+	  return getJobAttemptBaseDirName(context) + Path.SEPARATOR + 
+	  FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+      "_" + context.getTaskAttemptID().toString();
+  }
 }



Mime
View raw message