hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1430876 - in /hadoop/common/branches/branch-1: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Wed, 09 Jan 2013 15:01:10 GMT
Author: tomwhite
Date: Wed Jan  9 15:01:10 2013
New Revision: 1430876

URL: http://svn.apache.org/viewvc?rev=1430876&view=rev
Log:
MAPREDUCE-4850. Job recovery may fail if staging directory has been deleted.

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1430876&r1=1430875&r2=1430876&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Wed Jan  9 15:01:10 2013
@@ -413,6 +413,9 @@ Release 1.2.0 - unreleased
     HADOOP-9191. TestAccessControlList and TestJobHistoryConfig fail with
     JDK7. (Arpit Agarwal via suresh)
 
+    MAPREDUCE-4850. Job recovery may fail if staging directory has been
+    deleted. (tomwhite)
+
 Release 1.1.2 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1430876&r1=1430875&r2=1430876&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
(original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
Wed Jan  9 15:01:10 2013
@@ -56,10 +56,17 @@ public class CleanupQueue {
   static class PathDeletionContext {
     final Path fullPath;// full path of file or dir
     final Configuration conf;
+    final UserGroupInformation ugi;
 
     public PathDeletionContext(Path fullPath, Configuration conf) {
+      this(fullPath, conf, null);
+    }
+
+    public PathDeletionContext(Path fullPath, Configuration conf,
+        UserGroupInformation ugi) {
       this.fullPath = fullPath;
       this.conf = conf;
+      this.ugi = ugi;
     }
     
     protected Path getPathForCleanup() {
@@ -72,7 +79,7 @@ public class CleanupQueue {
      */
     protected void deletePath() throws IOException, InterruptedException {
       final Path p = getPathForCleanup();
-      UserGroupInformation.getLoginUser().doAs(
+      (ugi == null ? UserGroupInformation.getLoginUser() : ugi).doAs(
           new PrivilegedExceptionAction<Object>() {
             public Object run() throws IOException {
              p.getFileSystem(conf).delete(p, true);

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1430876&r1=1430875&r2=1430876&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Wed Jan  9 15:01:10 2013
@@ -3250,7 +3250,17 @@ public class JobInProgress {
 
         Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
         CleanupQueue.getInstance().addToQueue(
-            new PathDeletionContext(tempDir, conf)); 
+            new PathDeletionContext(tempDir, conf));
+
+        // delete the staging area for the job
+        String jobTempDir = conf.get("mapreduce.job.dir");
+        if (jobTempDir != null && conf.getKeepTaskFilesPattern() == null &&
+            !conf.getKeepFailedTaskFiles()) {
+          Path jobTempDirPath = new Path(jobTempDir);
+          CleanupQueue.getInstance().addToQueue(
+              new PathDeletionContext(jobTempDirPath, conf, userUGI));
+        }
+
       } catch (IOException e) {
         LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
       }

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1430876&r1=1430875&r2=1430876&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Task.java Wed Jan
 9 15:01:10 2013
@@ -1071,14 +1071,6 @@ abstract public class Task implements Wr
                             + JobStatus.State.FAILED + " or "
                             + JobStatus.State.KILLED);
     }
-    // delete the staging area for the job
-    JobConf conf = new JobConf(jobContext.getConfiguration());
-    if (!supportIsolationRunner(conf)) {
-      String jobTempDir = conf.get("mapreduce.job.dir");
-      Path jobTempDirPath = new Path(jobTempDir);
-      FileSystem fs = jobTempDirPath.getFileSystem(conf);
-      fs.delete(jobTempDirPath, true);
-    }
     done(umbilical, reporter);
   }
 

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1430876&r1=1430875&r2=1430876&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
(original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Wed Jan  9 15:01:10 2013
@@ -21,11 +21,13 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
+import java.util.concurrent.CountDownLatch;
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -53,25 +55,27 @@ public class TestRecoveryManager {
   private MiniMRCluster mr;
 
   @Before
-  public void setUp() {
-    JobConf conf = new JobConf();
-    try {
-      fs = FileSystem.get(new Configuration());
-      fs.delete(TEST_DIR, true);
-      conf.set("mapred.jobtracker.job.history.block.size", "1024");
-      conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
-      mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
+  public void setUp() throws IOException {
+    fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true);
+  }
+
+  private void startCluster() throws IOException {
+    startCluster(new JobConf());
+  }
+
+  private void startCluster(JobConf conf) throws IOException {
+    mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
   }
 
   @After
   public void tearDown() {
-    ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
-        .getClusterStatus(false);
-    if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
-      mr.shutdown();
+    if (mr != null) {
+      ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
+          .getClusterStatus(false);
+      if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
+        mr.shutdown();
+      }
     }
   }
   
@@ -87,6 +91,7 @@ public class TestRecoveryManager {
   @Test(timeout=120000)
   public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
     LOG.info("Testing jobtracker restart with faulty job");
+    startCluster();
     String signalFile = new Path(TEST_DIR, "signal").toString();
 
     JobConf job1 = mr.createJobConf();
@@ -168,6 +173,7 @@ public class TestRecoveryManager {
   @Test(timeout=120000)
   public void testJobResubmission() throws Exception {
     LOG.info("Testing Job Resubmission");
+    startCluster();
     String signalFile = new Path(TEST_DIR, "signal").toString();
 
     // make sure that the jobtracker is in recovery mode
@@ -216,6 +222,73 @@ public class TestRecoveryManager {
     Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
   }
 
+  public static class TestJobTrackerInstrumentation extends JobTrackerInstrumentation {
+    static CountDownLatch finalizeCall = new CountDownLatch(1);
+
+    public TestJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
+      super(jt, conf);
+    }
+
+    public void finalizeJob(JobConf conf, JobID id) {
+      if (finalizeCall.getCount() == 0) {
+        return;
+      }
+      finalizeCall.countDown();
+      throw new IllegalStateException("Controlled error finalizing job");
+    }
+  }
+
+  @Test
+  public void testJobTrackerRestartBeforeJobFinalization() throws Exception {
+    LOG.info("Testing Job Resubmission");
+
+    JobConf conf = new JobConf();
+    // make sure that the jobtracker is in recovery mode
+    conf.setBoolean("mapred.jobtracker.restart.recover", true);
+
+    // use a test JobTrackerInstrumentation implementation to shut down
+    // the jobtracker after the tasks have all completed, but
+    // before the job is finalized and check that it can be recovered correctly
+    conf.setClass("mapred.jobtracker.instrumentation", TestJobTrackerInstrumentation.class,
+            JobTrackerInstrumentation.class);
+
+    startCluster(conf);
+
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    SleepJob job = new SleepJob();
+    job.setConf(mr.createJobConf());
+    JobConf job1 = job.setupJobConf(1, 0, 1, 1, 1, 1);
+    JobClient jc = new JobClient(job1);
+    RunningJob rJob1 = jc.submitJob(job1);
+    LOG.info("Submitted first job " + rJob1.getID());
+
+    TestJobTrackerInstrumentation.finalizeCall.await();
+
+    // kill the jobtracker
+    LOG.info("Stopping jobtracker");
+    mr.stopJobTracker();
+
+    // start the jobtracker
+    LOG.info("Starting jobtracker");
+    mr.startJobTracker();
+    UtilsForTests.waitForJobTracker(jc);
+
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    // assert that job is recovered by the jobtracker
+    Assert.assertEquals("Resubmission failed ", 1,
+        jobtracker.getAllJobs().length);
+
+    // wait for job 1 to complete
+    JobInProgress jip = jobtracker.getJob(rJob1.getID());
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
+      UtilsForTests.waitFor(100);
+    }
+    Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
+  }
+
   /**
    * Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown 
    * during recovery. It does the following :
@@ -231,6 +304,7 @@ public class TestRecoveryManager {
   @Test(timeout=120000)
   public void testJobTrackerRestartWithBadJobs() throws Exception {
     LOG.info("Testing recovery-manager");
+    startCluster();
     String signalFile = new Path(TEST_DIR, "signal").toString();
     // make sure that the jobtracker is in recovery mode
     mr.getJobTrackerConf()
@@ -362,6 +436,7 @@ public class TestRecoveryManager {
   @Test(timeout=120000)
   public void testRestartCount() throws Exception {
     LOG.info("Testing Job Restart Count");
+    startCluster();
     String signalFile = new Path(TEST_DIR, "signal").toString();
     // make sure that the jobtracker is in recovery mode
     mr.getJobTrackerConf()



Mime
View raw message