hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r806508 - in /hadoop/mapreduce/trunk: ./ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/ src/java/org/apache/hadoop/mapred/ src/test/ src/test/mapred/org/apache/hadoop/mapred/
Date Fri, 21 Aug 2009 11:45:37 GMT
Author: sharad
Date: Fri Aug 21 11:45:37 2009
New Revision: 806508

URL: http://svn.apache.org/viewvc?rev=806508&view=rev
Log:
MAPREDUCE-870. Remove the job retire thread and the associated config parameters.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/test/mapred-site.xml
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=806508&r1=806507&r2=806508&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Aug 21 11:45:37 2009
@@ -218,6 +218,9 @@
 
     MAPREDUCE-712. Minor efficiency tweaks to RandomTextWriter. (cdouglas)
 
+    MAPREDUCE-870. Remove the job retire thread and the associated 
+    config parameters. (sharad)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java?rev=806508&r1=806507&r2=806508&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
Fri Aug 21 11:45:37 2009
@@ -28,6 +28,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Properties;
 import java.util.StringTokenizer;
 
 import org.apache.commons.logging.Log;
@@ -65,7 +66,13 @@
     utilTest.checkUserDir();
     utilTest.redirectIfAntJunit();
   }
-  
+
+  protected void setUp() throws Exception {
+    Properties props = new Properties();
+    props.setProperty("mapred.job.tracker.retire.jobs", "false");
+    startCluster(true, props);
+  }
+
   private void createInput() throws Exception {
     OutputStream os = getFileSystem().create(new Path(getInputDir(), 
         "text.txt"));

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java?rev=806508&r1=806507&r2=806508&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java
Fri Aug 21 11:45:37 2009
@@ -26,6 +26,7 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.TaskReport;
 
@@ -78,8 +79,10 @@
   public void testStreamingStatus() throws Exception {
     MiniMRCluster mr = null;
     FileSystem fs = null;
+    JobConf conf = new JobConf();
+    conf.setBoolean("mapred.job.tracker.retire.jobs", false);
     try {
-      mr = new MiniMRCluster(1, "file:///", 3);
+      mr = new MiniMRCluster(1, "file:///", 3, null , null, conf);
 
       Path inFile = new Path(INPUT_FILE);
       fs = inFile.getFileSystem(mr.createJobConf());

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=806508&r1=806507&r2=806508&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Fri Aug 21 11:45:37 2009
@@ -341,13 +341,6 @@
 </property>
 
 <property>
-  <name>mapred.jobtracker.completeuserjobs.maximum</name>
-  <value>100</value>
-  <description>The maximum number of complete jobs per user to keep around 
-  before delegating them to the job history.</description>
-</property>
-
-<property>
   <name>mapred.job.tracker.retiredjobs.cache.size</name>
   <value>1000</value>
   <description>The number of retired job status to keep in the cache.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=806508&r1=806507&r2=806508&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java Fri Aug 21 11:45:37
2009
@@ -188,6 +188,9 @@
     }
 
     void moveToDone(final JobID id) {
+      if (disableHistory) {
+        return;
+      }
       final List<Path> paths = new ArrayList<Path>();
       final Path historyFile = fileManager.getHistoryFile(id);
       if (historyFile == null) {
@@ -218,19 +221,18 @@
                     new FsPermission(HISTORY_FILE_PERMISSION));
               }
             }
-
-            String historyFileDonePath = null;
-            if (historyFile != null) {
-              historyFileDonePath = new Path(DONE, 
-                  historyFile.getName()).toString();
-            }
-            jobTracker.historyFileCopied(id, historyFileDonePath);
-            
-            //purge the job from the cache
-            fileManager.purgeJob(id);
           } catch (Throwable e) {
             LOG.error("Unable to move history file to DONE folder.", e);
           }
+          String historyFileDonePath = null;
+          if (historyFile != null) {
+            historyFileDonePath = new Path(DONE, 
+                historyFile.getName()).toString();
+          }
+          jobTracker.retireJob(id, historyFileDonePath);
+          
+          //purge the job from the cache
+          fileManager.purgeJob(id);
         }
 
       });

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=806508&r1=806507&r2=806508&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Aug 21 11:45:37
2009
@@ -111,9 +111,7 @@
   }
 
   private final long tasktrackerExpiryInterval;
-  private final long retireJobInterval;
-  private final long retireJobCheckInterval;
-
+  
   // The interval after which one fault of a tracker will be discarded,
   // if there are no faults during this. 
   private static long UPDATE_FAULTY_TRACKER_INTERVAL = 24 * 60 * 60 * 1000;
@@ -166,19 +164,6 @@
     }
   }
 
-  /**
-   * The maximum no. of 'completed' (successful/failed/killed)
-   * jobs kept in memory per-user. 
-   */
-  final int MAX_COMPLETE_USER_JOBS_IN_MEMORY;
-
-   /**
-    * The minimum time (in ms) that a job's information has to remain
-    * in the JobTracker's memory before it is retired.
-    */
-  final int MIN_TIME_BEFORE_RETIRE;
-
-
   private int nextJobId = 1;
 
   public static final Log LOG = LogFactory.getLog(JobTracker.class);
@@ -433,29 +418,58 @@
     }
   }
 
-  synchronized void historyFileCopied(JobID jobid, String historyFile) {
-    JobStatus status = getJobStatus(jobid);
-    if (status != null) {
-      String trackingUrl = "";
-      if (historyFile != null) {
-        status.setHistoryFile(historyFile);
-        try {
-          trackingUrl = "http://" + getJobTrackerMachine() + ":" + 
-            getInfoPort() + "/jobdetailshistory.jsp?jobid=" + 
-            jobid + "&logFile=" + URLEncoder.encode(historyFile, "UTF-8");
-        } catch (UnsupportedEncodingException e) {
-          LOG.warn("Could not create trackingUrl", e);
+  synchronized void retireJob(JobID jobid, String historyFile) {
+    synchronized (jobs) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        JobStatus status = job.getStatus();
+        
+        //set the historyfile and update the tracking url
+        String trackingUrl = "";
+        if (historyFile != null) {
+          status.setHistoryFile(historyFile);
+          try {
+            trackingUrl = "http://" + getJobTrackerMachine() + ":" + 
+              getInfoPort() + "/jobdetailshistory.jsp?jobid=" + 
+              jobid + "&logFile=" + URLEncoder.encode(historyFile, "UTF-8");
+          } catch (UnsupportedEncodingException e) {
+            LOG.warn("Could not create trackingUrl", e);
+          }
+        }
+        status.setTrackingUrl(trackingUrl);
+        // clean up job files from the local disk
+        JobHistory.JobInfo.cleanupJob(job.getProfile().getJobID());
+
+        //this configuration is primarily for testing
+        //test cases can set this to false to validate job data structures on 
+        //job completion
+        boolean retireJob = 
+          conf.getBoolean("mapred.job.tracker.retire.jobs", true);
+
+        if (retireJob) {
+          //purge the job from memory
+          removeJobTasks(job);
+          jobs.remove(job.getProfile().getJobID());
+          for (JobInProgressListener l : jobInProgressListeners) {
+            l.jobRemoved(job);
+          }
+
+          String jobUser = job.getProfile().getUser();
+          LOG.info("Retired job with id: '" + 
+                   job.getProfile().getJobID() + "' of user '" +
+                   jobUser + "'");
+
+          //add the job status to retired cache
+          retireJobs.addToCache(job.getStatus());
         }
       }
-      status.setTrackingUrl(trackingUrl);
     }
   }
 
   ///////////////////////////////////////////////////////
   // Used to remove old finished Jobs that have been around for too long
   ///////////////////////////////////////////////////////
-  class RetireJobs implements Runnable {
-    int runCount = 0;
+  class RetireJobs {
     private final Map<JobID, JobStatus> jobIDStatusMap = 
       new HashMap<JobID, JobStatus>();
     private final LinkedList<JobStatus> jobStatusQ = 
@@ -482,74 +496,8 @@
     synchronized LinkedList<JobStatus> getAll() {
       return (LinkedList<JobStatus>) jobStatusQ.clone();
     }
-
-    /**
-     * The run method lives for the life of the JobTracker,
-     * and removes Jobs that are not still running, but which
-     * finished a long time ago.
-     */
-    public void run() {
-      while (true) {
-        ++runCount;
-        try {
-          Thread.sleep(retireJobCheckInterval);
-          List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
-          long now = clock.getTime();
-          long retireBefore = now - retireJobInterval;
-
-          synchronized (jobs) {
-            for(JobInProgress job: jobs.values()) {
-              if (job.getStatus().getRunState() != JobStatus.RUNNING &&
-                  job.getStatus().getRunState() != JobStatus.PREP &&
-                  (job.getFinishTime() + MIN_TIME_BEFORE_RETIRE < now) &&
-                  (job.getFinishTime()  < retireBefore)) {
-                retiredJobs.add(job);
-              }
-            }
-          }
-          if (!retiredJobs.isEmpty()) {
-            synchronized (JobTracker.this) {
-              synchronized (jobs) {
-                synchronized (taskScheduler) {
-                  for (JobInProgress job: retiredJobs) {
-                    removeJobTasks(job);
-                    jobs.remove(job.getProfile().getJobID());
-                    for (JobInProgressListener l : jobInProgressListeners) {
-                      l.jobRemoved(job);
-                    }
-                    String jobUser = job.getProfile().getUser();
-                    synchronized (userToJobsMap) {
-                      ArrayList<JobInProgress> userJobs =
-                        userToJobsMap.get(jobUser);
-                      synchronized (userJobs) {
-                        userJobs.remove(job);
-                      }
-                      if (userJobs.isEmpty()) {
-                        userToJobsMap.remove(jobUser);
-                      }
-                    }
-                    LOG.info("Retired job with id: '" + 
-                             job.getProfile().getJobID() + "' of user '" +
-                             jobUser + "'");
-
-                    // clean up job files from the local disk
-                    JobHistory.JobInfo.cleanupJob(job.getProfile().getJobID());
-                    addToCache(job.getStatus());
-                  }
-                }
-              }
-            }
-          }
-        } catch (InterruptedException t) {
-          break;
-        } catch (Throwable t) {
-          LOG.error("Error in retiring job:\n" +
-                    StringUtils.stringifyException(t));
-        }
-      }
-    }
   }
-  
+
   enum ReasonForBlackListing {
     EXCEEDING_FAILURES,
     NODE_UNHEALTHY
@@ -1724,10 +1672,6 @@
   // All the known jobs.  (jobid->JobInProgress)
   Map<JobID, JobInProgress> jobs = new TreeMap<JobID, JobInProgress>();
 
-  // (user -> list of JobInProgress)
-  TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
-    new TreeMap<String, ArrayList<JobInProgress>>();
-    
   // (trackerID --> list of jobs to cleanup)
   Map<String, Set<JobID>> trackerToJobsToCleanup = 
     new HashMap<String, Set<JobID>>();
@@ -1784,7 +1728,6 @@
   ExpireTrackers expireTrackers = new ExpireTrackers();
   Thread expireTrackersThread = null;
   RetireJobs retireJobs = new RetireJobs();
-  Thread retireJobsThread = null;
   final int retiredJobsCacheSize;
   ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
   Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
@@ -1868,13 +1811,8 @@
     //
     tasktrackerExpiryInterval = 
       conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
-    retireJobInterval = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60
* 1000);
-    retireJobCheckInterval = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
     retiredJobsCacheSize = 
       conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
-    // min time before retire
-    MIN_TIME_BEFORE_RETIRE = conf.getInt("mapred.jobtracker.retirejob.interval.min", 60000);
-    MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum",
100);
     MAX_BLACKLISTS_PER_TRACKER = 
         conf.getInt("mapred.max.tracker.blacklists", 4);
     NUM_HEARTBEATS_IN_SECOND = 
@@ -2186,8 +2124,6 @@
     
     startExpireTrackersThread();
 
-    this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
-    this.retireJobsThread.start();
     expireLaunchingTaskThread.start();
 
     if (completedJobStatusStore.isActive()) {
@@ -2224,15 +2160,6 @@
 
     stopExpireTrackersThread();
 
-    if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
-      LOG.info("Stopping retirer");
-      this.retireJobsThread.interrupt();
-      try {
-        this.retireJobsThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
     if (taskScheduler != null) {
       taskScheduler.terminate();
     }
@@ -2403,11 +2330,7 @@
   /**
    * Call {@link #removeTaskEntry(String)} for each of the
    * job's tasks.
-   * When the JobTracker is retiring the long-completed
-   * job, either because it has outlived {@link #retireJobInterval}
-   * or the limit of {@link #MAX_COMPLETE_USER_JOBS_IN_MEMORY} jobs 
-   * has been reached, we can afford to nuke all it's tasks; a little
-   * unsafe, but practically feasible. 
+   * When the job is retiring we can afford to nuke all it's tasks
    * 
    * @param job the job about to be 'retired'
    */
@@ -2459,8 +2382,6 @@
     final JobTrackerInstrumentation metrics = getInstrumentation();
     metrics.finalizeJob(conf, id);
     
-    long now = clock.getTime();
-    
     // mark the job for cleanup at all the trackers
     addJobForCleanup(id);
 
@@ -2472,74 +2393,6 @@
         }
       }
     }
-    
-    // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given
user
-    // in memory; information about the purged jobs is available via
-    // JobHistory.
-    synchronized (jobs) {
-      synchronized (taskScheduler) {
-        synchronized (userToJobsMap) {
-          String jobUser = job.getProfile().getUser();
-          if (!userToJobsMap.containsKey(jobUser)) {
-            userToJobsMap.put(jobUser, 
-                              new ArrayList<JobInProgress>());
-          }
-          ArrayList<JobInProgress> userJobs = 
-            userToJobsMap.get(jobUser);
-          synchronized (userJobs) {
-            // Add the currently completed 'job'
-            userJobs.add(job);
-
-            // Check if we need to retire some jobs of this user
-            while (userJobs.size() > 
-                   MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
-              JobInProgress rjob = userJobs.get(0);
-
-              // do not retire jobs that finished in the very recent past.
-              if (rjob.getFinishTime() + MIN_TIME_BEFORE_RETIRE > now) {
-                break;
-              }
-                
-              // Cleanup all datastructures
-              int rjobRunState = 
-                rjob.getStatus().getRunState();
-              if (rjobRunState == JobStatus.SUCCEEDED || 
-                  rjobRunState == JobStatus.FAILED ||
-                  rjobRunState == JobStatus.KILLED) {
-                // Ok, this call to removeTaskEntries
-                // is dangerous is some very very obscure
-                // cases; e.g. when rjob completed, hit
-                // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
-                // limit and yet some task (taskid)
-                // wasn't complete!
-                removeJobTasks(rjob);
-                  
-                userJobs.remove(0);
-                jobs.remove(rjob.getProfile().getJobID());
-                for (JobInProgressListener listener : jobInProgressListeners) {
-                  listener.jobRemoved(rjob);
-                }
-                  
-                LOG.info("Retired job with id: '" + 
-                         rjob.getProfile().getJobID() + "' of user: '" +
-                         jobUser + "'");
-                // clean up job files from the local disk
-                JobHistory.JobInfo.cleanupJob(rjob.getProfile().getJobID());
-                retireJobs.addToCache(rjob.getStatus());
-              } else {
-                // Do not remove jobs that aren't complete.
-                // Stop here, and let the next pass take
-                // care of purging jobs.
-                break;
-              }
-            }
-          }
-          if (userJobs.isEmpty()) {
-            userToJobsMap.remove(jobUser);
-          }
-        }
-      }
-    }
   }
 
   ///////////////////////////////////////////////////////

Modified: hadoop/mapreduce/trunk/src/test/mapred-site.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred-site.xml?rev=806508&r1=806507&r2=806508&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred-site.xml (original)
+++ hadoop/mapreduce/trunk/src/test/mapred-site.xml Fri Aug 21 11:45:37 2009
@@ -14,5 +14,9 @@
   <value>hosts.exclude</value>
   <description></description>
 </property>
-
+<property>
+  <name>mapred.job.tracker.retire.jobs</name>
+  <value>false</value>
+  <description></description>
+</property>
 </configuration>

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java?rev=806508&r1=806507&r2=806508&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java Fri
Aug 21 11:45:37 2009
@@ -22,296 +22,70 @@
 import java.io.IOException;
 
 import junit.framework.TestCase;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 
 /**
- * Test if the job expiry works fine. 
+ * Test if the job retire works fine. 
  */
 public class TestJobRetire extends TestCase {
   static final Path testDir = 
     new Path(System.getProperty("test.build.data","/tmp"), 
              "job-expiry-testing");
-  private static final Log LOG = LogFactory.getLog(TestJobRetire.class);
-
-  private void testJobConfFile(JobID id, boolean exists) {
-    // get the job conf filename
-    String name = JobHistory.JobInfo.getLocalJobFilePath(id);
-    File file = new File(name);
- 
-    assertEquals("JobConf file check failed", exists, file.exists());
-  }
-
-  /** Test if the job after completion waits for atleast 
-   *  mapred.jobtracker.retirejob.interval.min amount of time.
-   */
-  public void testMinIntervalBeforeRetire() throws Exception {
-    MiniMRCluster mr = null;
-    int min = 10000;
-    int max = 5000;
-    try {
-      FakeClock clock = new FakeClock();
-      JobConf conf = new JobConf();
-
-      conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec
-      conf.setInt("mapred.jobtracker.retirejob.interval.min", min); //10 secs
-      conf.setInt("mapred.jobtracker.retirejob.interval", max); // 5 secs
-      conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false);
-      conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 0);
-      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0, 
-                             clock);
-      JobConf jobConf = mr.createJobConf();
-      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
-
-      Path inDir = new Path(testDir, "input");
-      Path outDir = new Path(testDir, "output");
-      RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj.waitForCompletion();
-      JobID id = rj.getID();
-      JobClient jc = new JobClient(jobConf);
-
-      // check if the job is there in the memory for min time
-      assertTrue(rj.isSuccessful());
-
-      // snapshot expiry thread count
-      int snapshot = jobtracker.retireJobs.runCount;
-      clock.advance(max + 1); // adv to expiry max time
-
-      // wait for the thread to run
-      while (jobtracker.retireJobs.runCount == snapshot) {
-        // wait for the thread to run
-        UtilsForTests.waitFor(1000);
-      }
-
-      assertNotNull(jc.getJob(id));
-
-      //check that the job is not retired
-      assertFalse(jc.getJob(id).isRetired());
-      
-      // snapshot expiry thread count
-      snapshot = jobtracker.retireJobs.runCount;
-      clock.advance(min - max); // adv to expiry min time
-
-      while (jobtracker.retireJobs.runCount == snapshot) {
-        // wait for the thread to run
-        UtilsForTests.waitFor(1000);
-      }
-
-      // check if the job is missing
-      assertNull(jc.getJob(id));
-      assertNull(jobtracker.getJob(id));
-
-      testJobConfFile(id, false);
-    } finally {
-      if (mr != null) { mr.shutdown();}
-    }
-  }
 
-  /** Test if the job after completion get expired after
-   *  mapred.jobtracker.retirejob.interval amount after the time.
-   */
   public void testJobRetire() throws Exception {
     MiniMRCluster mr = null;
-    int min = 10000;
-    int max = 20000;
     try {
-      FakeClock clock = new FakeClock();
       JobConf conf = new JobConf();
 
-      conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec
-      conf.setInt("mapred.jobtracker.retirejob.interval.min", min); // 10 secs
-      conf.setInt("mapred.jobtracker.retirejob.interval", max); // 20 secs
-      conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false);
-      conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 0);
-      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0, 
-                             clock);
+      conf.setBoolean("mapred.job.tracker.retire.jobs", true);
+      conf.setLong("mapred.job.tracker.retiredjobs.cache.size", 1);
+      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0);
       JobConf jobConf = mr.createJobConf();
       JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
       
       Path inDir = new Path(testDir, "input1");
       Path outDir = new Path(testDir, "output1");
-      RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj.waitForCompletion();
-      JobID id = rj.getID();
-      JobClient jc = new JobClient(jobConf);
-
-      // check if the job is there in the memory for min time
-      assertTrue(rj.isSuccessful());
-
-      // snapshot expiry thread count
-      int snapshot = jobtracker.retireJobs.runCount;
-      clock.advance(max + 1); // adv to expiry max time
-
-      while (jobtracker.retireJobs.runCount == snapshot) {
-        // wait for the thread to run
-        UtilsForTests.waitFor(1000);
-      }
- 
-      // check if the job is missing
-      assertNull(jc.getJob(id));
-      assertNull(jobtracker.getJob(id));
 
-      testJobConfFile(id, false);
-    } finally {
-      if (mr != null) { mr.shutdown();}
-    }
-  }
+      JobID id1 = validateJobRetire(jobConf, inDir, outDir, jobtracker);
 
-  /** Test if the job after gets expired after
-   *  mapred.jobtracker.completeuserjobs.maximum jobs.
-   */
-  public void testMaxJobRetire() throws Exception {
-    MiniMRCluster mr = null;
-    int min = 10000;
-    int max = 20000;
-    try {
-      FakeClock clock = new FakeClock();
-      JobConf conf = new JobConf();
-      
-      conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec
-      conf.setInt("mapred.jobtracker.retirejob.interval.min", min); // 10 secs
-      conf.setInt("mapred.jobtracker.retirejob.interval", max); // 20 secs
-      conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false);
-      conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 1);
-      conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 0);
-      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0, 
-                             clock);
-      JobConf jobConf = mr.createJobConf();
-      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
-      
-      Path inDir = new Path(testDir, "input2.1");
-      Path outDir = new Path(testDir, "output2.1");
-      RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj.waitForCompletion();
-      JobID id = rj.getID();
-      JobClient jc = new JobClient(jobConf);
-
-      // check if the job is successful
-      assertTrue(rj.isSuccessful());
-
-      clock.advance(min + 1); // adv to expiry min time
-
-      inDir = new Path(testDir, "input2.2");
-      outDir = new Path(testDir, "output2.2");
-      RunningJob rj2 = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj2.waitForCompletion();
-      JobID id2 = rj2.getID();
-
-      // check if the job#1 is missing
-      assertNull(jc.getJob(id));
-      assertNull("Job still not missing from jobtracker", jobtracker.getJob(id));
-      
-      // check if the job#2 exists
-      assertNotNull(jc.getJob(id2));
-      assertNotNull("Job " + id2 + " missing at the jobtracker before expiry", 
-                    jobtracker.getJob(id2));
+      outDir = new Path(testDir, "output2");
+      JobID id2 = validateJobRetire(jobConf, inDir, outDir, jobtracker);
 
-      testJobConfFile(id, false);
-      testJobConfFile(id2, true);
+      assertNull("Job not removed from cache", jobtracker.getJobStatus(id1));
+
+      assertEquals("Total job in cache not correct", 
+          1, jobtracker.getAllJobs().length);
     } finally {
-      if (mr != null) {mr.shutdown();}
+      if (mr != null) { mr.shutdown();}
     }
   }
 
-  /** Test if the job after gets expired but basic info is cached with jobtracker
-   */
-  public void testRetiredJobCache() throws Exception {
-    MiniMRCluster mr = null;
-    int min = 10000;
-    int max = 20000;
-    try {
-      FakeClock clock = new FakeClock();
-      JobConf conf = new JobConf();
+  private JobID validateJobRetire(JobConf jobConf, Path inDir, Path outDir, 
+      JobTracker jobtracker) throws IOException {
 
-      conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec
-      conf.setInt("mapred.jobtracker.retirejob.interval.min", min); // 10 secs
-      conf.setInt("mapred.jobtracker.retirejob.interval", max); // 20 secs
-      conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false);
-      conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 1);
-      conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 1);
-      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0,
-                             clock);
-      JobConf jobConf = mr.createJobConf();
-      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
-
-      Path inDir = new Path(testDir, "input3.1");
-      Path outDir = new Path(testDir, "output3.1");
-      RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj.waitForCompletion();
-      JobID id = rj.getID();
-      JobClient jc = new JobClient(jobConf);
-
-      // check if the job is successful
-      assertTrue(rj.isSuccessful());
-      JobStatus status1 = jobtracker.getJobStatus(id);
-
-      clock.advance(min + 1); // adv to expiry min time
-
-      inDir = new Path(testDir, "input3.2");
-      outDir = new Path(testDir, "output3.2");
-      RunningJob rj2 = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj2.waitForCompletion();
-      JobID id2 = rj2.getID();
-      JobStatus status2 = jobtracker.getJobStatus(id2);
-
-      // check if the job#1 is missing in jt but cached status
-      assertNotNull("Job status missing from status cache", jc.getJob(id));
-      // check the status at jobtracker
-      assertEquals("Status mismatch for job " + id, status1.toString(), 
-                   jobtracker.getJobStatus(id).toString());
-      testRetiredCachedJobStatus(status1, rj);
-      assertNull("Job still not missing from jobtracker", jobtracker.getJob(id));
-
-      // check if the job#2 exists
-      assertNotNull(jc.getJob(id2));
-      // check the status .. 
-      
-      assertNotNull("Job " + id2 + " missing at the jobtracker before expiry",
-                    jobtracker.getJob(id2));
-
-      testJobConfFile(id, false);
-      testJobConfFile(id2, true);
-
-      clock.advance(min + 1); // adv to expiry min time
-
-      inDir = new Path(testDir, "input3.3");
-      outDir = new Path(testDir, "output3.3");
-      RunningJob rj3 = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj3.waitForCompletion();
-      JobID id3 = rj3.getID();
-
-      // check if the job#1 is missing in all the caches
-      assertNull("Job status still in status cache", jc.getJob(id));
-      // check if the job#2 is missing in jt but cached status
-      assertNotNull(jc.getJob(id2));
-      assertEquals("Status mismatch for job " + id2, status2.toString(), 
-                   jobtracker.getJobStatus(id2).toString());
-      testRetiredCachedJobStatus(status2, rj2);
-      assertNull("Job " + id2 + " missing at the jobtracker before expiry",
-                 jobtracker.getJob(id2));
-      // check if the job#3 exists
-      assertNotNull(jc.getJob(id3));
-      assertNotNull("Job " + id3 + " missing at the jobtracker before expiry",
-                    jobtracker.getJob(id3));
-    } finally {
-      if (mr != null) {mr.shutdown();}
+    RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
+    rj.waitForCompletion();
+    assertTrue(rj.isSuccessful());
+    JobID id = rj.getID();
+
+    JobInProgress job = jobtracker.getJob(id);
+    //wait for job to get retired
+    for (int i = 0; i < 10 && job != null; i++) {
+      UtilsForTests.waitFor(1000);
+      job = jobtracker.getJob(id);
     }
+    assertNull("Job did not retire", job);
+    assertTrue("History url not set", rj.getHistoryUrl() != null && 
+    rj.getHistoryUrl().length() > 0);
+    assertNotNull("Job is not in cache", jobtracker.getJobStatus(id));
+    
+    // get the job conf filename
+    String name = JobHistory.JobInfo.getLocalJobFilePath(id);
+    File file = new File(name);
+ 
+    assertFalse("JobConf file not deleted", file.exists());
+    return id;
   }
 
-  private static void testRetiredCachedJobStatus(JobStatus status, 
-                                                 RunningJob rj) 
-  throws IOException {
-    assertEquals(status.getJobID(), rj.getID());
-    assertEquals(status.mapProgress(), rj.mapProgress());
-    assertEquals(status.reduceProgress(), rj.reduceProgress());
-    assertEquals(status.setupProgress(), rj.setupProgress());
-    assertEquals(status.cleanupProgress(), rj.cleanupProgress());
-    assertEquals(status.getRunState(), rj.getJobState());
-    assertEquals(status.getJobName(), rj.getJobName());
-    assertEquals(status.getTrackingUrl(), rj.getTrackingURL());
-    assertEquals(status.isRetired(), true);
-  }
 }



Mime
View raw message