hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject svn commit: r1075569 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/lib/chain/ src/java/org/apache/hadoop/mapreduce/lib/map/ src/java/org/apache/hadoop/mapr...
Date Mon, 28 Feb 2011 22:40:02 GMT
Author: schen
Date: Mon Feb 28 22:40:02 2011
New Revision: 1075569

URL: http://svn.apache.org/viewvc?rev=1075569&view=rev
Log:
MAPREDUCE-2206. The task-cleanup tasks should be optional. (schen)


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Feb 28 22:40:02 2011
@@ -38,6 +38,8 @@ Trunk (unreleased changes)
     MAPREDUCE-1927. Unit test for HADOOP-6835 (concatenated gzip support).
     (Greg Roelofs via tomwhite)
 
+    MAPREDUCE-2206. The task-cleanup tasks should be optional. (schen)
+
   OPTIMIZATIONS
     
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Feb 28
22:40:02 2011
@@ -146,6 +146,7 @@ public class JobInProgress {
   private volatile boolean jobKilled = false;
   private volatile boolean jobFailed = false;
   private final boolean jobSetupCleanupNeeded;
+  private final boolean taskCleanupNeeded;
 
   JobPriority priority = JobPriority.NORMAL;
   protected JobTracker jobtracker;
@@ -360,6 +361,8 @@ public class JobInProgress {
         MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
     this.jobSetupCleanupNeeded = conf.getBoolean(
         MRJobConfig.SETUP_CLEANUP_NEEDED, true);
+    this.taskCleanupNeeded = conf.getBoolean(
+        MRJobConfig.TASK_CLEANUP_NEEDED, true);
     if (tracker != null) { // Some mock tests have null tracker
       this.jobHistory = tracker.getJobHistory();
     }
@@ -369,6 +372,7 @@ public class JobInProgress {
   JobInProgress(JobConf conf) {
     restartCount = 0;
     jobSetupCleanupNeeded = false;
+    taskCleanupNeeded = true;
 
     this.memoryPerMap = conf.getMemoryForMapTask();
     this.memoryPerReduce = conf.getMemoryForReduceTask();
@@ -449,6 +453,7 @@ public class JobInProgress {
           numMapTasks + numReduceTasks + 10);
       JobContext jobContext = new JobContextImpl(conf, jobId);
       this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
+      this.taskCleanupNeeded = jobContext.getTaskCleanupNeeded();
 
       // Construct the jobACLs
       status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
@@ -1078,12 +1083,12 @@ public class JobInProgress {
       status.setRunState(TaskStatus.State.KILLED);
     }
     
-    // If the job is complete and a task has just reported its 
-    // state as FAILED_UNCLEAN/KILLED_UNCLEAN, 
+    // If the job is complete or task-cleanup is switched off
+    // and a task has just reported its state as FAILED_UNCLEAN/KILLED_UNCLEAN, 
     // make the task's state FAILED/KILLED without launching cleanup attempt.
     // Note that if task is already a cleanup attempt, 
     // we don't change the state to make sure the task gets a killTaskAction
-    if ((this.isComplete() || jobFailed || jobKilled) && 
+    if ((this.isComplete() || jobFailed || jobKilled || !taskCleanupNeeded) && 
         !tip.isCleanupAttempt(taskid)) {
       if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
         status.setRunState(TaskStatus.State.FAILED);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Mon Feb 28
22:40:02 2011
@@ -182,6 +182,13 @@ public interface JobContext extends MRJo
    * @return boolean 
    */
   public boolean getJobSetupCleanupNeeded();
+  
+  /**
+   * Get whether task-cleanup is needed for the job 
+   * 
+   * @return boolean 
+   */
+  public boolean getTaskCleanupNeeded();
 
   /**
    * Get whether the task profiling is enabled.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Mon Feb 28
22:40:02 2011
@@ -40,6 +40,8 @@ public interface MRJobConfig {
 
   public static final String SETUP_CLEANUP_NEEDED = "mapreduce.job.committer.setup.cleanup.needed";
 
+  public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed";
+
   public static final String JAR = "mapreduce.job.jar";
 
   public static final String ID = "mapreduce.job.id";

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
Mon Feb 28 22:40:02 2011
@@ -198,6 +198,11 @@ class ChainMapContextImpl<KEYIN, VALUEIN
   }
 
   @Override
+  public boolean getTaskCleanupNeeded() {
+    return base.getTaskCleanupNeeded();
+  }
+
+  @Override
   public Path[] getLocalCacheArchives() throws IOException {
     return base.getLocalCacheArchives();
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
Mon Feb 28 22:40:02 2011
@@ -191,6 +191,11 @@ class ChainReduceContextImpl<KEYIN, VALU
   }
 
   @Override
+  public boolean getTaskCleanupNeeded() {
+    return base.getTaskCleanupNeeded();
+  }
+
+  @Override
   public Path[] getLocalCacheArchives() throws IOException {
     return base.getLocalCacheArchives();
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
Mon Feb 28 22:40:02 2011
@@ -200,6 +200,11 @@ public class WrappedMapper<KEYIN, VALUEI
     }
 
     @Override
+    public boolean getTaskCleanupNeeded() {
+      return mapContext.getTaskCleanupNeeded();
+    }
+
+    @Override
     public Path[] getLocalCacheArchives() throws IOException {
       return mapContext.getLocalCacheArchives();
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
Mon Feb 28 22:40:02 2011
@@ -193,6 +193,11 @@ public class WrappedReducer<KEYIN, VALUE
     }
 
     @Override
+    public boolean getTaskCleanupNeeded() {
+      return reduceContext.getTaskCleanupNeeded();
+    }
+
+    @Override
     public Path[] getLocalCacheArchives() throws IOException {
       return reduceContext.getLocalCacheArchives();
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Mon
Feb 28 22:40:02 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.Partitioner;
@@ -263,7 +264,16 @@ public class JobContextImpl implements J
    * @return boolean 
    */
   public boolean getJobSetupCleanupNeeded() {
-    return conf.getBoolean("mapred.committer.job.setup.cleanup.needed", true);
+    return conf.getBoolean(MRJobConfig.SETUP_CLEANUP_NEEDED, true);
+  }
+  
+  /**
+   * Get whether task-cleanup is needed for the job 
+   * 
+   * @return boolean 
+   */
+  public boolean getTaskCleanupNeeded() {
+    return conf.getBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, true);
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java Mon
Feb 28 22:40:02 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.TaskType;
 
@@ -125,7 +126,8 @@ public class TestTaskFail extends TestCa
   }
   
   private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId, 
-                               TaskStatus ts, boolean isCleanup, JobTracker jt) 
+                               TaskStatus ts, boolean isCleanup,
+                               boolean containsCleanupLog, JobTracker jt) 
   throws IOException {
     assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
     assertTrue(ts != null);
@@ -142,11 +144,12 @@ public class TestTaskFail extends TestCa
         "&filter=STDERR";
     assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization
         .getHttpStatusCode(tasklogUrl, tip.getUser(), "GET"));
-    if (!isCleanup) {
+    if (containsCleanupLog) {
       // validate task logs: tasklog should contain both task logs
       // and cleanup logs
       assertTrue(log.contains(cleanupLog));
-    } else {
+    }
+    if (isCleanup) {
       // validate tasklogs for cleanup attempt
       log = MapReduceTestUtil.readTaskLog(
                  TaskLog.LogName.STDERR, attemptId, true);
@@ -169,7 +172,7 @@ public class TestTaskFail extends TestCa
     }
   }
 
-  private void validateJob(RunningJob job, JobTracker jt) 
+  private void validateJob(RunningJob job, JobTracker jt, boolean cleanupNeeded) 
   throws IOException {
     assertEquals(JobStatus.SUCCEEDED, job.getJobState());
 	    
@@ -181,19 +184,21 @@ public class TestTaskFail extends TestCa
       new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 0);
     TaskInProgress tip = jt.getTip(attemptId.getTaskID());
     TaskStatus ts = jt.getTaskStatus(attemptId);
-    validateAttempt(tip, attemptId, ts, false, jt);
+    // task logs will contain cleanup message because the task is failed by
+    // throwing IOException
+    validateAttempt(tip, attemptId, ts, false, true, jt);
     
     attemptId =  new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 1);
     // this should be cleanup attempt since the second attempt fails
     // with System.exit
     ts = jt.getTaskStatus(attemptId);
-    validateAttempt(tip, attemptId, ts, true, jt);
+    validateAttempt(tip, attemptId, ts, cleanupNeeded, false, jt);
     
     attemptId =  new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 2);
     // this should be cleanup attempt since the third attempt fails
     // with Error
     ts = jt.getTaskStatus(attemptId);
-    validateAttempt(tip, attemptId, ts, true, jt);
+    validateAttempt(tip, attemptId, ts, cleanupNeeded, false, jt);
   }
   
   public void testWithDFS() throws IOException {
@@ -219,18 +224,25 @@ public class TestTaskFail extends TestCa
       jobConf.setOutputCommitter(CommitterWithLogs.class);
       RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
       rJob.waitForCompletion();
-      validateJob(rJob, jt);
+      validateJob(rJob, jt, true);
       // launch job with fail tasks and fail-cleanups
       fileSys.delete(outDir, true);
       jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
       rJob = launchJob(jobConf, inDir, outDir, input);
       rJob.waitForCompletion();
-      validateJob(rJob, jt);
+      validateJob(rJob, jt, true);
       fileSys.delete(outDir, true);
       jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class);
       rJob = launchJob(jobConf, inDir, outDir, input);
       rJob.waitForCompletion();
-      validateJob(rJob, jt);
+      validateJob(rJob, jt, true);
+      // launch job with task-cleanup switched off
+      fileSys.delete(outDir, true);
+      jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
+      jobConf.setBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, false);
+      rJob = launchJob(jobConf, inDir, outDir, input);
+      rJob.waitForCompletion();
+      validateJob(rJob, jt, false);
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown(); }



Mime
View raw message