hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r781255 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Date Wed, 03 Jun 2009 03:51:21 GMT
Author: ddas
Date: Wed Jun  3 03:51:21 2009
New Revision: 781255

URL: http://svn.apache.org/viewvc?rev=781255&view=rev
Log:
HADOOP-5924. Fixes a corner case problem to do with job recovery with empty history files.
Also, after a JT restart, sends KillTaskAction to  tasks that report back but the corresponding
job hasn't been initialized yet. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=781255&r1=781254&r2=781255&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jun  3 03:51:21 2009
@@ -863,6 +863,11 @@
     HADOOP-5908. Fixes a problem to do with ArithmeticException in the 
     JobTracker when there are jobs with 0 maps. (Amar Kamat via ddas)
 
+    HADOOP-5924. Fixes a corner case problem to do with job recovery with
+    empty history files. Also, after a JT restart, sends KillTaskAction to 
+    tasks that report back but the corresponding job hasn't been initialized
+    yet. (Amar Kamat via ddas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=781255&r1=781254&r2=781255&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Wed Jun  3 03:51:21
2009
@@ -936,19 +936,16 @@
   throws IOException {
     FileStatus[] contents = fs.listStatus(jobDirPath);
     int matchCount = 0;
-    if (contents != null && contents.length >=3) {
+    if (contents != null && contents.length >=2) {
       for (FileStatus status : contents) {
         if ("job.xml".equals(status.getPath().getName())) {
           ++matchCount;
         }
-        if ("job.jar".equals(status.getPath().getName())) {
-          ++matchCount;
-        }
         if ("job.split".equals(status.getPath().getName())) {
           ++matchCount;
         }
       }
-      if (matchCount == 3) {
+      if (matchCount == 2) {
         return true;
       }
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=781255&r1=781254&r2=781255&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Wed Jun  3 03:51:21
2009
@@ -875,6 +875,11 @@
         String logFileName = null;
         if (restarted) {
           logFileName = getJobHistoryFileName(jobConf, jobId);
+          if (logFileName == null) {
+            logFileName =
+              encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
+            
+          }
         } else {
           logFileName = 
             encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=781255&r1=781254&r2=781255&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Jun  3 03:51:21
2009
@@ -1439,6 +1439,10 @@
   Map<String, Set<JobID>> trackerToJobsToCleanup = 
     new HashMap<String, Set<JobID>>();
   
+  // (trackerID --> list of tasks to cleanup)
+  Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup = 
+    new HashMap<String, Set<TaskAttemptID>>();
+  
   // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
   Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
     new TreeMap<TaskAttemptID, TaskInProgress>();
@@ -2823,8 +2827,8 @@
                                                               String taskTracker) {
     
     Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
+    List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
     if (taskIds != null) {
-      List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
       for (TaskAttemptID killTaskId : taskIds) {
         TaskInProgress tip = taskidToTIPMap.get(killTaskId);
         if (tip == null) {
@@ -2842,10 +2846,18 @@
           }
         }
       }
-            
-      return killList;
     }
-    return null;
+    
+    // add the stray attempts for uninited jobs
+    synchronized (trackerToTasksToCleanup) {
+      Set<TaskAttemptID> set = trackerToTasksToCleanup.remove(taskTracker);
+      if (set != null) {
+        for (TaskAttemptID id : set) {
+          killList.add(new KillTaskAction(id));
+        }
+      }
+    }
+    return killList;
   }
 
   /**
@@ -3521,6 +3533,19 @@
         continue;
       }
       
+      if (!job.inited()) {
+        // if job is not yet initialized ... kill the attempt
+        synchronized (trackerToTasksToCleanup) {
+          Set<TaskAttemptID> tasks = trackerToTasksToCleanup.get(trackerName);
+          if (tasks == null) {
+            tasks = new HashSet<TaskAttemptID>();
+            trackerToTasksToCleanup.put(trackerName, tasks);
+          }
+          tasks.add(taskId);
+        }
+        continue;
+      }
+
       TaskInProgress tip = taskidToTIPMap.get(taskId);
       // Check if the tip is known to the jobtracker. In case of a restarted
       // jt, some tasks might join in later
@@ -3585,6 +3610,10 @@
       trackerToJobsToCleanup.remove(trackerName);
     }
     
+    synchronized (trackerToTasksToCleanup) {
+      trackerToTasksToCleanup.remove(trackerName);
+    }
+    
     // Inform the recovery manager
     recoveryManager.unMarkTracker(trackerName);
     

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=781255&r1=781254&r2=781255&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
(original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Wed Jun  3 03:51:21 2009
@@ -34,11 +34,11 @@
  * recover previosuly submitted jobs.
  */
 public class TestJobTrackerRestart extends TestCase {
-  final Path testDir = 
+  static final Path testDir = 
     new Path(System.getProperty("test.build.data","/tmp"), 
              "jt-restart-testing");
   final Path inDir = new Path(testDir, "input");
-  final Path shareDir = new Path(testDir, "share");
+  static final Path shareDir = new Path(testDir, "share");
   final Path outputDir = new Path(testDir, "output");
   private static int numJobsSubmitted = 0;
   
@@ -400,6 +400,115 @@
            && status.getReduceTasks() == 0;
   }
   
+  /** Committer with setup waiting
+   */
+  static class CommitterWithDelaySetup extends FileOutputCommitter {
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+      FileSystem fs = FileSystem.get(context.getConfiguration());
+      while (true) {
+        if (fs.exists(shareDir)) {
+          break;
+        }
+        UtilsForTests.waitFor(100);
+      }
+      super.cleanupJob(context);
+    }
+  }
+
+  /** Tests a job on jobtracker with restart-recovery turned on and empty 
+   *  jobhistory file.
+   * Preparation :
+   *    - Configure a job with
+   *       - num-maps : 0 (long waiting setup)
+   *       - num-reducers : 0
+   *    
+   * Check if the job succeedes after restart.
+   * 
+   * Assumption that map slots are given first for setup.
+   */
+  public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs, 
+                                              MiniMRCluster mr) 
+  throws IOException {
+    mr.startTaskTracker(null, null, 1, 1);
+    FileSystem fileSys = dfs.getFileSystem();
+    
+    cleanUp(fileSys, shareDir);
+    cleanUp(fileSys, inDir);
+    cleanUp(fileSys, outputDir);
+    
+    JobConf conf = mr.createJobConf();
+    conf.setNumReduceTasks(0);
+    conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class);
+    fileSys.delete(outputDir, false);
+    RunningJob job1 = 
+      UtilsForTests.runJob(conf, inDir, outputDir, 30, 0);
+    
+    conf.setNumReduceTasks(0);
+    conf.setOutputCommitter(CommitterWithDelaySetup.class);
+    Path inDir2 = new Path(testDir, "input2");
+    fileSys.mkdirs(inDir2);
+    Path outDir2 = new Path(testDir, "output2");
+    fileSys.delete(outDir2, false);
+    JobConf newConf = getJobs(mr.createJobConf(),
+                              new JobPriority[] {JobPriority.NORMAL},
+                              new int[] {10}, new int[] {0},
+                              outDir2, inDir2,
+                              getMapSignalFile(shareDir),
+                              getReduceSignalFile(shareDir))[0];
+
+    JobClient jobClient = new JobClient(newConf);
+    RunningJob job2 = jobClient.submitJob(newConf);
+    JobID id = job2.getID();
+
+    /*RunningJob job2 = 
+      UtilsForTests.runJob(mr.createJobConf(), inDir2, outDir2, 0);
+    
+    JobID id = job2.getID();*/
+    JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
+    
+    jip.initTasks();
+    
+    // find out the history filename
+    String history = 
+      JobHistory.JobInfo.getJobHistoryFileName(jip.getJobConf(), id);
+    Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation(history);
+    
+    //  make sure that setup is launched
+    while (jip.runningMaps() == 0) {
+      UtilsForTests.waitFor(100);
+    }
+    
+    id = job1.getID();
+    jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
+    
+    jip.initTasks();
+    
+    //  make sure that cleanup is launched and is waiting
+    while (!jip.isCleanupLaunched()) {
+      UtilsForTests.waitFor(100);
+    }
+    
+    mr.stopJobTracker();
+    
+    // delete the history file .. just to be safe.
+    FileSystem historyFS = historyPath.getFileSystem(conf);
+    historyFS.delete(historyPath, false);
+    historyFS.create(historyPath).close(); // create an empty file
+    
+    
+    UtilsForTests.signalTasks(dfs, fileSys, getMapSignalFile(shareDir), getReduceSignalFile(shareDir),
(short)1);
+
+    // Turn on the recovery
+    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
+                                      true);
+    
+    mr.startJobTracker();
+    
+    job1.waitForCompletion();
+    job2.waitForCompletion();
+  }
+  
   public void testJobTrackerRestart() throws IOException {
     String namenode = null;
     MiniDFSCluster dfs = null;
@@ -450,6 +559,9 @@
       
       // Test jobtracker with restart-recovery turned off
       testRestartWithoutRecovery(dfs, mr);
+      
+      // test recovery with empty file
+      testJobRecoveryWithEmptyHistory(dfs, mr);
     } finally {
       if (mr != null) {
         try {



Mime
View raw message