hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r900815 - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/java/or...
Date Tue, 19 Jan 2010 15:18:08 GMT
Author: yhemanth
Date: Tue Jan 19 15:18:07 2010
New Revision: 900815

URL: http://svn.apache.org/viewvc?rev=900815&view=rev
Log:
MAPREDUCE-1316. Fixes a memory leak of TaskInProgress instances in the jobtracker. Contributed
by Amar Kamat.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
    hadoop/mapreduce/trunk/src/webapps/job/jobdetails.jsp
    hadoop/mapreduce/trunk/src/webapps/job/jobfailures.jsp

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jan 19 15:18:07 2010
@@ -1127,3 +1127,6 @@
     MAPREDUCE-1342. Fixed deadlock in global blacklisting of tasktrackers.
     (Amareshwari Sriramadasu via acmurthy)
 
+    MAPREDUCE-1316. Fixes a memory leak of TaskInProgress instances in
+    the jobtracker. (Amar Kamat via yhemanth)
+

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Tue Jan 19 15:18:07 2010
@@ -601,7 +601,7 @@
       //Check if job supports speculative map execution first then
       //check if job has speculative maps.
       return (job.getJobConf().getMapSpeculativeExecution())&& (
-          hasSpeculativeTask(job.getMapTasks(),
+          hasSpeculativeTask(job.getTasks(TaskType.MAP),
                              tts));
     }
 
@@ -644,7 +644,7 @@
       //check if the job supports reduce speculative execution first then
       //check if the job has speculative tasks.
       return (job.getJobConf().getReduceSpeculativeExecution()) && (
-          hasSpeculativeTask(job.getReduceTasks(),
+          hasSpeculativeTask(job.getTasks(TaskType.REDUCE),
                              tts));
     }
 

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
Tue Jan 19 15:18:07 2010
@@ -58,7 +58,7 @@
       // - have N attempts running, in which case it demands N slots, and may
       //   potentially demand one more slot if it needs to be speculated
       TaskInProgress[] tips = (taskType == TaskType.MAP ? 
-          job.getMapTasks() : job.getReduceTasks());
+          job.getTasks(TaskType.MAP) : job.getTasks(TaskType.REDUCE));
       boolean speculationEnabled = (taskType == TaskType.MAP ?
           job.hasSpeculativeMaps() : job.hasSpeculativeReduces());
       long time = scheduler.getClock().getTime();

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Tue Jan 19 15:18:07 2010
@@ -834,7 +834,8 @@
     // Finish up the tasks and advance time again. Note that we must finish
     // the task since FakeJobInProgress does not properly maintain running
     // tasks, so the scheduler will always get an empty task list from
-    // the JobInProgress's getMapTasks/getReduceTasks and think they finished.
+    // the JobInProgress's getTasks(TaskType.MAP)/getTasks(TaskType.REDUCE) and 
+    // think they finished.
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000000_0");
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000000_0");
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000000_0");
@@ -942,7 +943,8 @@
     // Finish up the tasks and advance time again. Note that we must finish
     // the task since FakeJobInProgress does not properly maintain running
     // tasks, so the scheduler will always get an empty task list from
-    // the JobInProgress's getMapTasks/getReduceTasks and think they finished.
+    // the JobInProgress's getTasks(TaskType.MAP)/getTasks(TaskType.REDUCE) and
+    // think they finished.
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000000_0");
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000000_0");
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000000_0");

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Jan 19
15:18:07 2010
@@ -645,6 +645,9 @@
     
     jobHistory.logEvent(jie, jobId);
    
+    // Log the number of map and reduce tasks
+    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks 
+             + " map tasks and " + numReduceTasks + " reduce tasks.");
   }
 
   // Returns true if the job is empty (0 maps, 0 reduces and no setup-cleanup)
@@ -731,6 +734,7 @@
   
   synchronized void initSetupCleanupTasks(String jobFile) {
     if (!jobSetupCleanupNeeded) {
+      LOG.info("Setup/Cleanup not needed for job " + jobId);
       // nothing to initialize
       return;
     }
@@ -879,36 +883,42 @@
     return launchedSetup;
   }
 
-  /**
-   * Get the list of map tasks
-   * @return the raw array of maps for this job
-   */
-  TaskInProgress[] getMapTasks() {
-    return maps;
-  }
-    
-  /**
-   * Get the list of cleanup tasks
-   * @return the array of cleanup tasks for the job
-   */
-  TaskInProgress[] getCleanupTasks() {
-    return cleanup;
-  }
-  
-  /**
-   * Get the list of setup tasks
-   * @return the array of setup tasks for the job
-   */
-  TaskInProgress[] getSetupTasks() {
-    return setup;
-  }
-  
-  /**
-   * Get the list of reduce tasks
-   * @return the raw array of reduce tasks for this job
-   */
-  TaskInProgress[] getReduceTasks() {
-    return reduces;
+  /** 
+   * Get all the tasks of the desired type in this job.
+   * @param type {@link TaskType} of the tasks required
+   * @return An array of {@link TaskInProgress} matching the given type. 
+   *         Returns an empty array if no tasks are found for the given type.  
+   */
+  TaskInProgress[] getTasks(TaskType type) {
+    TaskInProgress[] tasks = null;
+    switch (type) {
+      case MAP:
+      {
+        tasks = maps;
+      }
+      break;
+      case REDUCE:
+      {
+        tasks = reduces;
+      }
+      break;
+      case JOB_SETUP: 
+      {
+        tasks = setup;
+      }
+      break;
+      case JOB_CLEANUP:
+      {
+        tasks = cleanup;
+      }
+      break;
+      default:
+      {
+          tasks = new TaskInProgress[0];
+      }
+      break;
+    }
+    return tasks;
   }
 
   /**
@@ -3458,11 +3468,11 @@
                "submitTime" + EQUALS + job.getStartTime() + StringUtils.COMMA +
                "launchTime" + EQUALS + job.getLaunchTime() + StringUtils.COMMA +
                "finishTime" + EQUALS + job.getFinishTime() + StringUtils.COMMA +
-               "numMaps" + EQUALS + job.getMapTasks().length + 
+               "numMaps" + EQUALS + job.getTasks(TaskType.MAP).length + 
                            StringUtils.COMMA +
                "numSlotsPerMap" + EQUALS + job.getNumSlotsPerMap() + 
                                   StringUtils.COMMA +
-               "numReduces" + EQUALS + job.getReduceTasks().length + 
+               "numReduces" + EQUALS + job.getTasks(TaskType.REDUCE).length + 
                               StringUtils.COMMA +
                "numSlotsPerReduce" + EQUALS + job.getNumSlotsPerReduce() + 
                                      StringUtils.COMMA +

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Jan 19 15:18:07
2010
@@ -1768,8 +1768,7 @@
   // and TaskInProgress
   ///////////////////////////////////////////////////////
   void createTaskEntry(TaskAttemptID taskid, String taskTracker, TaskInProgress tip) {
-    LOG.info("Adding task " + 
-      (tip.isCleanupAttempt(taskid) ? "(cleanup)" : "") + 
+    LOG.info("Adding task (" + tip.getAttemptType(taskid) + ") " + 
       "'"  + taskid + "' to tip " + 
       tip.getTIPId() + ", for tracker '" + taskTracker + "'");
 
@@ -1802,9 +1801,10 @@
     }
 
     // taskid --> TIP
-    taskidToTIPMap.remove(taskid);
-        
-    LOG.debug("Removing task '" + taskid + "'");
+    if (taskidToTIPMap.remove(taskid) != null) {
+      // log the task removal in case of success
+      LOG.info("Removing task '" + taskid + "'");
+    }
   }
     
   /**
@@ -1833,7 +1833,7 @@
    * @param job the completed job
    */
   void markCompletedJob(JobInProgress job) {
-    for (TaskInProgress tip : job.getSetupTasks()) {
+    for (TaskInProgress tip : job.getTasks(TaskType.JOB_SETUP)) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
@@ -1843,7 +1843,7 @@
         }
       }
     }
-    for (TaskInProgress tip : job.getMapTasks()) {
+    for (TaskInProgress tip : job.getTasks(TaskType.MAP)) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
@@ -1855,7 +1855,7 @@
         }
       }
     }
-    for (TaskInProgress tip : job.getReduceTasks()) {
+    for (TaskInProgress tip : job.getTasks(TaskType.REDUCE)) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
@@ -1883,8 +1883,10 @@
     if (markedTaskSet != null) {
       for (TaskAttemptID taskid : markedTaskSet) {
         removeTaskEntry(taskid);
-        LOG.info("Removed completed task '" + taskid + "' from '" + 
-                 taskTracker + "'");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removed marked completed task '" + taskid + "' from '" + 
+                    taskTracker + "'");
+        }
       }
       // Clear
       trackerToMarkedTasksMap.remove(taskTracker);
@@ -1898,15 +1900,16 @@
    * 
    * @param job the job about to be 'retired'
    */
-  synchronized private void removeJobTasks(JobInProgress job) { 
-    for (TaskInProgress tip : job.getMapTasks()) {
-      for (TaskStatus taskStatus : tip.getTaskStatuses()) {
-        removeTaskEntry(taskStatus.getTaskID());
-      }
-    }
-    for (TaskInProgress tip : job.getReduceTasks()) {
-      for (TaskStatus taskStatus : tip.getTaskStatuses()) {
-        removeTaskEntry(taskStatus.getTaskID());
+  synchronized void removeJobTasks(JobInProgress job) { 
+    // iterate over all the task types
+    for (TaskType type : TaskType.values()) {
+      // iterate over all the tips of the type under consideration
+      for (TaskInProgress tip : job.getTasks(type)) {
+        // iterate over all the task-ids in the tip under consideration
+        for (TaskAttemptID id : tip.getAllTaskAttemptIDs()) {
+          // remove the task-id entry from the jobtracker
+          removeTaskEntry(id);
+        }
       }
     }
   }
@@ -3008,6 +3011,9 @@
       }
     }
     myInstrumentation.submitJob(job.getJobConf(), jobId);
+    LOG.info("Job " + jobId + " added successfully for user '" 
+             + job.getJobConf().getUser() + "' to queue '" 
+             + job.getJobConf().getQueueName() + "'");
     return job.getStatus();
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Jan 19
15:18:07 2010
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -319,6 +320,32 @@
   }
     
   /**
+   * Returns the {@link TaskType} of the {@link TaskAttemptID} passed. 
+   * The type of an attempt is determined by the nature of the task and not its 
+   * id. 
+   * For example,
+   * - Attempt 'attempt_123_01_m_01_0' might be a job-setup task even though it 
+   *   has a _m_ in its id. Hence the task type of this attempt is JOB_SETUP 
+   *   instead of MAP.
+   * - Similarly reduce attempt 'attempt_123_01_r_01_0' might have failed and is
+   *   now supposed to do the task-level cleanup. In such a case this attempt 
+   *   will be of type TASK_CLEANUP instead of REDUCE.
+   */
+  TaskType getAttemptType (TaskAttemptID id) {
+    if (isCleanupAttempt(id)) {
+      return TaskType.TASK_CLEANUP;
+    } else if (isJobSetupTask()) {
+      return TaskType.JOB_SETUP;
+    } else if (isJobCleanupTask()) {
+      return TaskType.JOB_CLEANUP;
+    } else if (isMapTask()) {
+      return TaskType.MAP;
+    } else {
+      return TaskType.REDUCE;
+    }
+  }
+  
+  /**
    * Is the Task associated with taskid is the first attempt of the tip? 
    * @param taskId
    * @return Returns true if the Task is the first attempt of the tip
@@ -820,6 +847,13 @@
   }
 
   /**
+   * Get all the {@link TaskAttemptID}s in this {@link TaskInProgress}
+   */
+  TaskAttemptID[] getAllTaskAttemptIDs() {
+    return tasks.toArray(new TaskAttemptID[tasks.size()]);
+  }
+  
+  /**
    * Get the status of the specified task
    * @param taskid
    * @return

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jan 19 15:18:07
2010
@@ -1397,7 +1397,7 @@
    * @return false if the tracker was unknown
    * @throws IOException
    */
-  private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+  HeartbeatResponse transmitHeartBeat(long now) throws IOException {
     // Send Counters in the status once every COUNTER_UPDATE_INTERVAL
     boolean sendCounters;
     if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Tue
Jan 19 15:18:07 2010
@@ -189,7 +189,7 @@
       conf.set(MRConfig.LOCAL_DIR, localPath.toString());
       LOG.info(MRConfig.LOCAL_DIR + " is " +  localPath);
       try {
-        tt = new TaskTracker(conf);
+        tt = createTaskTracker(conf);
         isInitialized = true;
       } catch (Throwable e) {
         isDead = true;
@@ -199,6 +199,13 @@
     }
         
     /**
+     * Creates a default {@link TaskTracker} using the conf passed.
+     */
+    TaskTracker createTaskTracker(JobConf conf) throws IOException {
+      return new TaskTracker(conf);
+    }
+    
+    /**
      * Create and run the task tracker.
      */
     public void run() {
@@ -682,6 +689,13 @@
     TaskTrackerRunner taskTracker;
     taskTracker = new TaskTrackerRunner(idx, numDir, host, conf);
     
+    addTaskTracker(taskTracker);
+  }
+  
+  /**
+   * Add a task-tracker to the Mini-MR cluster.
+   */
+  void addTaskTracker(TaskTrackerRunner taskTracker) {
     Thread taskTrackerThread = new Thread(taskTracker);
     taskTrackerList.add(taskTracker);
     taskTrackerThreadList.add(taskTrackerThread);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java Tue
Jan 19 15:18:07 2010
@@ -25,25 +25,41 @@
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MiniMRCluster.TaskTrackerRunner;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 
 /**
  * Test if the job retire works fine. 
  */
 public class TestJobRetire extends TestCase {
+  static final Log LOG = LogFactory.getLog(TestJobRetire.class);
   static final Path testDir = 
     new Path(System.getProperty("test.build.data","/tmp"), 
              "job-expiry-testing");
 
+  private MiniMRCluster startCluster(JobConf conf, int numTrackers) 
+  throws IOException {
+    conf.setBoolean(JTConfig.JT_RETIREJOBS, true);
+    conf.setLong(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1);
+    return new MiniMRCluster(0, 0, numTrackers, "file:///", 1, null, null, null,
+                             conf, 0);
+  }
+  
   public void testJobRetire() throws Exception {
     MiniMRCluster mr = null;
     try {
       JobConf conf = new JobConf();
-
-      conf.setBoolean(JTConfig.JT_RETIREJOBS, true);
-      conf.setLong(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1);
-      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0);
+      mr = startCluster(conf, 1);
+      
       JobConf jobConf = mr.createJobConf();
       JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
       
@@ -61,6 +77,7 @@
           1, jobtracker.getAllJobs().length);
     } finally {
       if (mr != null) { mr.shutdown();}
+      FileUtil.fullyDelete(new File(testDir.toString()));
     }
   }
 
@@ -72,13 +89,9 @@
     assertTrue(rj.isSuccessful());
     JobID id = rj.getID();
 
-    JobInProgress job = jobtracker.getJob(id);
     //wait for job to get retired
-    for (int i = 0; i < 10 && job != null; i++) {
-      UtilsForTests.waitFor(1000);
-      job = jobtracker.getJob(id);
-    }
-    assertNull("Job did not retire", job);
+    waitTillRetire(id, jobtracker);
+    
     assertTrue("History url not set", rj.getHistoryUrl() != null && 
     rj.getHistoryUrl().length() > 0);
     assertNotNull("Job is not in cache", jobtracker.getJobStatus(id));
@@ -105,4 +118,245 @@
     return id;
   }
 
+  // wait till the job retires
+  private void waitTillRetire(JobID id, JobTracker jobtracker) {
+    JobInProgress job = jobtracker.getJob(id);
+    //wait for job to get retired
+    for (int i = 0; i < 10 && job != null; i++) {
+      UtilsForTests.waitFor(1000);
+      job = jobtracker.getJob(id);
+    }
+    assertNull("Job did not retire", job);
+  }
+  
+  /**
+   * Custom TaskTracker which waits forever after a successful contact to 
+   * the JobTracker.
+   */
+  class WaitingTaskTracker extends TaskTracker {
+    
+    WaitingTaskTracker(JobConf conf) throws IOException {
+      super(conf);
+    }
+    
+    @Override
+    HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+      HeartbeatResponse response = super.transmitHeartBeat(now);
+      LOG.info("WaitingTaskTracker waiting");
+      // wait forever
+      UtilsForTests.waitFor(Long.MAX_VALUE);
+      throw new IOException ("WaitingTaskTracker interrupted. Bailing out");
+    }
+  }
+  
+  /**
+   * Test job retire with tasks that report their *first* status only after the
+   * job retires.
+   * Steps :
+   *  - Start a mini-mr cluster with 1 task-tracker having only map slots.
+   *    Note that this task-tracker will take care of setup/cleanup and map 
+   *    tasks.
+   *  - Submit a job with 1 map task and 1 reduce task
+   *  - Wait for the job to finish the map task
+   *  - Start a 2nd tracker that waits for a long time after contacting the JT.
+   *  - Wait for the 2nd tracker to get stuck
+   *  - Kill the job
+   *  - Wait for the job to retire
+   *  - Check if the tip mappings are cleaned up. 
+   */
+  public void testJobRetireWithUnreportedTasks() throws Exception {
+    MiniMRCluster mr = null;
+    try {
+      JobConf conf = new JobConf();
+      conf.setInt(TTConfig.TT_MAP_SLOTS, 1);
+      conf.setInt(TTConfig.TT_REDUCE_SLOTS, 0);     
+      mr = startCluster(conf, 1);
+      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+      
+      // submit a job
+      Path inDir = new Path(testDir, "in-1");
+      Path outDir = new Path(testDir, "out-1");
+      JobConf jConf = mr.createJobConf();
+      FileInputFormat.setInputPaths(jConf, new Path[] {inDir});
+      FileOutputFormat.setOutputPath(jConf, outDir);
+      SleepJob sleepJob = new SleepJob();
+      sleepJob.setConf(jConf);
+      Job job = sleepJob.createJob(1, 1, 0, 1, 0, 1);
+
+      job.submit();
+      JobID id = JobID.downgrade(job.getStatus().getJobID());
+      JobInProgress jip = jobtracker.getJob(id);
+      
+      // wait 100 secs for the map to complete
+      for (int i = 0; i < 100 && (jip.finishedMaps() < 1); i++) {
+        UtilsForTests.waitFor(1000);
+      }
+      assertEquals(jip.finishedMaps(), 1);
+      
+      // start a tracker that will wait
+      LOG.info("Adding a waiting tracker");
+      TaskTrackerRunner testTrackerRunner = 
+        mr.new TaskTrackerRunner(1, 1, null, mr.createJobConf()) {
+        @Override
+        TaskTracker createTaskTracker(JobConf conf) throws IOException {
+          return new WaitingTaskTracker(conf);
+        }
+      };
+      mr.addTaskTracker(testTrackerRunner);
+      LOG.info("Waiting tracker added");
+      
+      WaitingTaskTracker testTT = 
+        (WaitingTaskTracker)testTrackerRunner.getTaskTracker();
+      
+      // wait 100 secs for the newly started task-tracker to join
+      for (int i = 0; i < 1000 && (jobtracker.taskTrackers().size() < 2); i++)
{
+        UtilsForTests.waitFor(100);
+      }
+      assertEquals(jobtracker.taskTrackers().size(), 2);
+      LOG.info("Cluster is now ready");
+      
+      // stop the test-tt as its no longer required
+      mr.stopTaskTracker(mr.getTaskTrackerID(testTT.getName()));
+      
+      // check if a reduce task got scheduled or not
+      assertEquals("Waiting tracker joined but no reduce task got scheduled", 
+                   1, jip.runningReduces());
+      
+      // kill the job
+      job.killJob();
+      
+      // check if the reduce task attempt status is missing
+      TaskInProgress tip = jip.getTasks(TaskType.REDUCE)[0]; 
+      assertNull(tip.getTaskStatus(tip.getAllTaskAttemptIDs()[0]));
+      
+      // wait for the job to retire
+      waitTillRetire(id, jobtracker);
+      
+      // check the taskidToTIPMap
+      for (TaskAttemptID tid : jobtracker.taskidToTIPMap.keySet()) {
+        LOG.info("TaskidToTIP mapping left over : " + tid);
+      }
+      assertEquals("'taskid' to TIP mapping still exists", 
+                   0, jobtracker.taskidToTIPMap.size());
+    } finally {
+      if (mr != null) { mr.shutdown(); }
+      FileUtil.fullyDelete(new File(testDir.toString()));
+    }
+  }
+  
+  /**
+   * (Mock)Test JobTracker.removeJobTasks() which is called only when the job 
+   * retires.
+   */
+  public void testJobRemoval() throws Exception {
+    MiniMRCluster mr = null;
+    try {
+      JobConf conf = new JobConf();
+      mr = startCluster(conf, 0);
+      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+      
+      // test map task removal
+      testRemoveJobTasks(jobtracker, conf, TaskType.MAP);
+      // test reduce task removal
+      testRemoveJobTasks(jobtracker, conf, TaskType.REDUCE);
+      // test job setup removal
+      testRemoveJobTasks(jobtracker, conf, TaskType.JOB_SETUP);
+      // test job cleanup removal
+      testRemoveJobTasks(jobtracker, conf, TaskType.JOB_CLEANUP);
+    } finally {
+      if (mr != null) { mr.shutdown();}
+      // cleanup
+      FileUtil.fullyDelete(new File(testDir.toString()));
+    }
+  }
+ 
+  // create a new job and add it to the jobtracker
+  private JobInProgress createAndAddJob(JobTracker jobtracker, JobConf conf) {
+    // submit a job in a fake manner
+    // get the new job-id
+    JobID id = 
+      new JobID(jobtracker.getTrackerIdentifier(), jobtracker.jobs.size() + 1);
+    // create a JobInProgress for this fake job
+    JobInProgress jip = new JobInProgress(id, conf, jobtracker);
+    
+    // insert this fake completed job in the jobtracker
+    jobtracker.jobs.put(id, jip);
+    
+    return jip;
+  }
+  
+  // create a new TaskInProgress and make it running by adding it to jobtracker
+  private TaskInProgress createAndAddTIP(JobTracker jobtracker, 
+                                         JobInProgress jip, TaskType type) {
+    JobConf conf = jip.getJobConf();
+    JobID id = jip.getJobID();
+    // now create a fake tip for this fake job
+    TaskInProgress tip = null;
+    if (type == TaskType.MAP) {
+      tip = new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, 
+                               jobtracker, conf, jip, 0, 1);
+      jip.maps = new TaskInProgress[] {tip};
+    } else if (type == TaskType.REDUCE) {
+      tip = new TaskInProgress(id, "dummy", jip.desiredMaps(), 0, 
+                               jobtracker, conf, jip, 1);
+      jip.reduces = new TaskInProgress[] {tip};
+    } else if (type == TaskType.JOB_SETUP) {
+      tip = 
+        new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, 
+                           jobtracker, conf, jip, 0, 1);
+      jip.setup = new TaskInProgress[] {tip};
+    } else if (type == TaskType.JOB_CLEANUP) {
+      tip = 
+        new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, 
+                           jobtracker, conf, jip, 0, 1);
+      jip.cleanup = new TaskInProgress[] {tip};
+    }
+    return tip;
+  }
+  
+  // create a new Task for the given tip and make it running
+  private TaskAttemptID createAndAddAttempt(TaskInProgress tip, int attemptId) {
+    // create a fake attempt for this fake task
+    TaskAttemptID taskid = new TaskAttemptID(tip.getTIPId(), attemptId);
+    
+    // insert this fake task into the jobtracker by making it running
+    tip.addRunningTask(taskid, "test-tt");
+    
+    return taskid;
+  }
+  
+  // Mock a job run such that the jobtracker is in a state similar to that 
+  // resulting from an actual job run.
+  // Steps :
+  //   - generate a new job-id
+  //   - create and add a JobInProgress object using the fake job-id
+  //   - create and add a fake tip of the passed type 't' under the fake job
+  //     Note that t can be a MAP or a REDUCE or a JOB_SETUP or a JOB_CLEANUP.
+  //   - create and add a fake attempt under the fake tip
+  //   - remove the job from the jobtracker
+  //   - check if the fake attempt is removed from the jobtracker
+  private void testRemoveJobTasks(JobTracker jobtracker, JobConf conf, 
+                                  TaskType type) {
+    // create and submit a job
+    JobInProgress jip = createAndAddJob(jobtracker, conf);
+    // create and add a tip
+    TaskInProgress tip = createAndAddTIP(jobtracker, jip, type);
+    // create and add an attempt
+    TaskAttemptID taskid = createAndAddAttempt(tip, 0);
+    
+    // this fake attempt should not have any status
+    assertNull(tip.getTaskStatus(taskid));
+    
+    // remove the job tasks for this fake job from the jobtracker
+    jobtracker.removeJobTasks(jip);
+    
+    // check the taskidToTIPMap
+    for (TaskAttemptID tid : jobtracker.taskidToTIPMap.keySet()) {
+      LOG.info("TaskidToTIP : " + tid);
+    }
+    
+    // check if the fake attempt is removed from the jobtracker
+    assertEquals("'taskid' to TIP mapping still exists", 
+                 0, jobtracker.taskidToTIPMap.size());
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
Tue Jan 19 15:18:07 2010
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 
@@ -155,7 +156,7 @@
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
     // get the running setup task id
-    TaskAttemptID setupID = getRunningTaskID(jip.getSetupTasks());
+    TaskAttemptID setupID = getRunningTaskID(jip.getTasks(TaskType.JOB_SETUP));
     if (commandLineKill) {
       killTaskFromCommandLine(job, setupID, jt);
     } else {
@@ -172,7 +173,8 @@
       } catch (InterruptedException ie) {}
     }
     // get the running cleanup task id
-    TaskAttemptID cleanupID = getRunningTaskID(jip.getCleanupTasks());
+    TaskAttemptID cleanupID = 
+      getRunningTaskID(jip.getTasks(TaskType.JOB_CLEANUP));
     if (commandLineKill) {
       killTaskFromCommandLine(job, cleanupID, jt);
     } else {

Modified: hadoop/mapreduce/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobdetails.jsp?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/jobdetails.jsp Tue Jan 19 15:18:07 2010
@@ -26,6 +26,7 @@
   import="java.util.*"
   import="java.text.DecimalFormat"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapreduce.TaskType"
   import="org.apache.hadoop.util.*"
   import="org.apache.hadoop.fs.Path"
   import="org.apache.hadoop.mapreduce.jobhistory.JobHistory"
@@ -240,7 +241,8 @@
     out.print("<b>Job File:</b> <a href=\"jobconf.jsp?jobid=" + jobId + "\">"

               + profile.getJobFile() + "</a><br>\n");
     out.print("<b>Job Setup:</b>");
-    printJobLevelTaskSummary(out, jobId, "setup", job.getSetupTasks());
+    printJobLevelTaskSummary(out, jobId, "setup", 
+                             job.getTasks(TaskType.JOB_SETUP));
     out.print("<br>\n");
     if (runState == JobStatus.RUNNING) {
       out.print("<b>Status:</b> Running<br>\n");
@@ -272,7 +274,8 @@
       }
     }
     out.print("<b>Job Cleanup:</b>");
-    printJobLevelTaskSummary(out, jobId, "cleanup", job.getCleanupTasks());
+    printJobLevelTaskSummary(out, jobId, "cleanup", 
+                             job.getTasks(TaskType.JOB_CLEANUP));
     out.print("<br>\n");
     if (flakyTaskTrackers > 0) {
       out.print("<b>Black-listed TaskTrackers:</b> " + 
@@ -291,9 +294,9 @@
               "<th><a href=\"jobfailures.jsp?jobid=" + jobId + 
               "\">Failed/Killed<br>Task Attempts</a></th></tr>\n");
     printTaskSummary(out, jobId, "map", status.mapProgress(), 
-                     job.getMapTasks());
+                     job.getTasks(TaskType.MAP));
     printTaskSummary(out, jobId, "reduce", status.reduceProgress(),
-                     job.getReduceTasks());
+                     job.getTasks(TaskType.REDUCE));
     out.print("</table>\n");
     
     %>
@@ -368,7 +371,7 @@
        style="width:100%" type="image/svg+xml" pluginspage="http://www.adobe.com/svg/viewer/install/"
/>
 <%}%>
 
-<%if(job.getReduceTasks().length > 0) { %>
+<%if(job.getTasks(TaskType.REDUCE).length > 0) { %>
 <hr>Reduce Completion Graph -
 <%if("off".equals(session.getAttribute("reduce.graph"))) { %>
 <a href="/jobdetails.jsp?jobid=<%=jobId%>&refresh=<%=refresh%>&reduce.graph=on"
> open </a>

Modified: hadoop/mapreduce/trunk/src/webapps/job/jobfailures.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobfailures.jsp?rev=900815&r1=900814&r2=900815&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/jobfailures.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/jobfailures.jsp Tue Jan 19 15:18:07 2010
@@ -24,6 +24,7 @@
   import="java.io.*"
   import="java.util.*"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapreduce.TaskType"
   import="org.apache.hadoop.util.*"
 %>
 
@@ -147,13 +148,13 @@
     out.print("<tr><th>Attempt</th><th>Task</th><th>Machine</th><th>State</th>"
+
               "<th>Error</th><th>Logs</th></tr>\n");
     if (includeMap) {
-      TaskInProgress[] tips = job.getMapTasks();
+      TaskInProgress[] tips = job.getTasks(TaskType.MAP);
       for(int i=0; i < tips.length; ++i) {
         printFailedAttempts(out, tracker, jobId, tips[i], state);
       }
     }
     if (includeReduce) {
-      TaskInProgress[] tips = job.getReduceTasks();
+      TaskInProgress[] tips = job.getTasks(TaskType.REDUCE);
       for(int i=0; i < tips.length; ++i) {
         printFailedAttempts(out, tracker, jobId, tips[i], state);
       }



Mime
View raw message