hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r789316 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Date Mon, 29 Jun 2009 13:21:31 GMT
Author: ddas
Date: Mon Jun 29 13:21:31 2009
New Revision: 789316

URL: http://svn.apache.org/viewvc?rev=789316&view=rev
Log:
MAPREDUCE-416. Moves the history file to a "done" folder whenever a job completes. Contributed
by Amar Kamat.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=789316&r1=789315&r2=789316&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jun 29 13:21:31 2009
@@ -35,6 +35,9 @@
     MAPREDUCE-502. Allow jobtracker to be configured with zero completed jobs
     in memory. (Amar Kamat via sharad)
 
+    MAPREDUCE-416. Moves the history file to a "done" folder whenever a job 
+    completes. (Amar Kamat via ddas)
+
   BUG FIXES
     HADOOP-4687. MapReduce is split from Hadoop Core. It is a subproject under 
     Hadoop (Owen O'Malley)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=789316&r1=789315&r2=789316&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java Mon Jun 29 13:21:31
2009
@@ -91,17 +91,80 @@
   public static final int JOB_NAME_TRIM_LENGTH = 50;
   private static String JOBTRACKER_UNIQUE_STRING = null;
   private static String LOG_DIR = null;
-  private static Map<String, ArrayList<PrintWriter>> openJobs = 
-                     new ConcurrentHashMap<String, ArrayList<PrintWriter>>();
   private static boolean disableHistory = false; 
   private static final String SECONDARY_FILE_SUFFIX = ".recover";
   private static long jobHistoryBlockSize = 0;
   private static String jobtrackerHostname;
+  private static JobHistoryFilesManager fileManager = 
+    new JobHistoryFilesManager();
   final static FsPermission HISTORY_DIR_PERMISSION =
     FsPermission.createImmutable((short) 0750); // rwxr-x---
   final static FsPermission HISTORY_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0740); // rwxr-----
   private static FileSystem JT_FS; // jobtracker's filesystem
+  private static Path DONE = null; // folder for completed jobs
+  
+  /**
+   * A class that manages all the files related to a job. For now 
+   *   - writers : list of open files
+   *   - job history filename
+   *   - job conf filename
+   */
+  private static class JobHistoryFilesManager {
+    // a private (virtual) folder for all the files related to a running job
+    private static class FilesHolder {
+      ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
+      Path historyFilename; // path of job history file
+      Path confFilename; // path of job's conf
+    }
+    
+    // cache from job-key to files associated with it.
+    private Map<JobID, FilesHolder> fileCache = 
+      new ConcurrentHashMap<JobID, FilesHolder>();
+
+    private FilesHolder getFileHolder(JobID id) {
+      FilesHolder holder = fileCache.get(id);
+      if (holder == null) {
+        holder = new FilesHolder();
+        fileCache.put(id, holder);
+      }
+      return holder;
+    }
+
+    void addWriter(JobID id, PrintWriter writer) {
+      FilesHolder holder = getFileHolder(id);
+      holder.writers.add(writer);
+    }
+
+    void setHistoryFile(JobID id, Path file) {
+      FilesHolder holder = getFileHolder(id);
+      holder.historyFilename = file;
+    }
+
+    void setConfFile(JobID id, Path file) {
+      FilesHolder holder = getFileHolder(id);
+      holder.confFilename = file;
+    }
+
+    ArrayList<PrintWriter> getWriters(JobID id) {
+      FilesHolder holder = fileCache.get(id);
+      return holder == null ? null : holder.writers;
+    }
+
+    Path getHistoryFile(JobID id) {
+      FilesHolder holder = fileCache.get(id);
+      return holder == null ? null : holder.historyFilename;
+    }
+
+    Path getConfFileWriters(JobID id) {
+      FilesHolder holder = fileCache.get(id);
+      return holder == null ? null : holder.confFilename;
+    }
+
+    void purgeJob(JobID id) {
+      fileCache.remove(id);
+    }
+  }
   /**
    * Record types are identifiers for each line of log in history files. 
    * A record type appears as the first token in a single line of log. 
@@ -164,6 +227,7 @@
         "file:///" + new File(
         System.getProperty("hadoop.log.dir")).getAbsolutePath()
         + File.separator + "history");
+      DONE = new Path(LOG_DIR, "done");
       JOBTRACKER_UNIQUE_STRING = hostname + "_" + 
                                     String.valueOf(jobTrackerStartTime) + "_";
       jobtrackerHostname = hostname;
@@ -184,6 +248,9 @@
       jobHistoryBlockSize = 
         conf.getLong("mapred.jobtracker.job.history.block.size", 
                      3 * 1024 * 1024);
+      
+      // create the done folder with appropriate permission
+      JT_FS.mkdirs(DONE, HISTORY_DIR_PERMISSION);
     } catch(IOException e) {
         LOG.error("Failed to initialize JobHistory log file", e); 
         disableHistory = true;
@@ -405,6 +472,13 @@
   } 
   
   /**
+   * Get the history location for completed jobs
+   */
+  static Path getCompletedJobHistoryLocation() {
+    return DONE;
+  }
+  
+  /**
    * Base class contais utility stuff to manage types key value pairs with enums. 
    */
   static class KeyValuePair{
@@ -657,13 +731,22 @@
     public static synchronized String getJobHistoryFileName(JobConf jobConf, 
                                                             JobID id) 
     throws IOException {
-      String user = getUserName(jobConf);
-      String jobName = trimJobName(getJobName(jobConf));
-      
       if (LOG_DIR == null) {
         return null;
       }
+      return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR));
+    }
 
+    /**
+     * @param dir The directory where to search.
+     */
+    static synchronized String getJobHistoryFileName(JobConf jobConf, 
+                                                            JobID id, 
+                                                            Path dir) 
+    throws IOException {
+      String user = getUserName(jobConf);
+      String jobName = trimJobName(getJobName(jobConf));
+      
       // Make the pattern matching the job's history file
       final Pattern historyFilePattern = 
         Pattern.compile(jobtrackerHostname + "_" + DIGITS + "_" 
@@ -688,26 +771,33 @@
         }
       };
       
-      FileStatus[] statuses = JT_FS.listStatus(new Path(LOG_DIR), filter);
+      FileStatus[] statuses = JT_FS.listStatus(dir, filter);
       String filename = null;
       if (statuses.length == 0) {
         LOG.info("Nothing to recover for job " + id);
       } else {
         // return filename considering that fact the name can be a 
         // secondary filename like filename.recover
-        filename = decodeJobHistoryFileName(statuses[0].getPath().getName());
-        // Remove the '.recover' suffix if it exists
-        if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
-          int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
-          filename = filename.substring(0, newLength);
-        }
-        filename = encodeJobHistoryFileName(filename);
+        filename = getPrimaryFilename(statuses[0].getPath().getName(), jobName);
         LOG.info("Recovered job history filename for job " + id + " is " 
                  + filename);
       }
       return filename;
     }
     
+    // removes all extra extensions from a filename and returns the core/primary
+    // filename
+    private static String getPrimaryFilename(String filename, String jobName) 
+    throws IOException{
+      filename = decodeJobHistoryFileName(filename);
+      // Remove the '.recover' suffix if it exists
+      if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
+        int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
+        filename = filename.substring(0, newLength);
+      }
+      return encodeJobHistoryFileName(filename);
+    }
+    
     /** Since there was a restart, there should be a master file and 
      * a recovery file. Once the recovery is complete, the master should be 
      * deleted as an indication that the recovery file should be treated as the 
@@ -802,31 +892,31 @@
 
     /** Finalize the recovery and make one file in the end. 
      * This invloves renaming the recover file to the master file.
+     * Note that this api should be invoked only if recovery is involved.
      * @param id Job id  
      * @param conf the job conf
      * @throws IOException
      */
     static synchronized void finalizeRecovery(JobID id, JobConf conf) 
     throws IOException {
-      String masterLogFileName = 
-        JobHistory.JobInfo.getJobHistoryFileName(conf, id);
-      if (masterLogFileName == null) {
+      Path tmpLogPath = fileManager.getHistoryFile(id);
+      if (tmpLogPath == null) {
+        LOG.debug("No file for job with " + id + " found in cache!");
         return;
       }
-      Path masterLogPath = 
-        JobHistory.JobInfo.getJobHistoryLogLocation(masterLogFileName);
-      String tmpLogFileName = getSecondaryJobHistoryFile(masterLogFileName);
-      Path tmpLogPath = 
-        JobHistory.JobInfo.getJobHistoryLogLocation(tmpLogFileName);
-      if (masterLogPath != null) {
-
-        // rename the tmp file to the master file. Note that this should be 
-        // done only when the file is closed and handles are released.
-        if(JT_FS.exists(tmpLogPath)) {
-          LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
-          JT_FS.rename(tmpLogPath, masterLogPath);
-        }
-      }
+      String tmpLogFileName = tmpLogPath.getName();
+      
+      // get the primary filename from the cached filename
+      String masterLogFileName = 
+        getPrimaryFilename(tmpLogFileName, getJobName(conf));
+      Path masterLogPath = new Path(tmpLogPath.getParent(), masterLogFileName);
+      
+      // rename the tmp file to the master file. Note that this should be 
+      // done only when the file is closed and handles are released.
+      LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
+      JT_FS.rename(tmpLogPath, masterLogPath);
+      // update the cache
+      fileManager.setHistoryFile(id, masterLogPath);
       
       // do the same for the user file too
       masterLogPath = 
@@ -859,6 +949,37 @@
     }
 
     /**
+     * Move the completed job into the completed folder.
+     * This assumes that the jobhistory file is closed and all operations on the
+     * jobhistory file is complete.
+     * This *should* be the last call to jobhistory for a given job.
+     */
+     static void markCompleted(JobID id) throws IOException {
+       Path path = fileManager.getHistoryFile(id);
+       if (path == null) {
+         LOG.info("No file for job-history with " + id + " found in cache!");
+         return;
+       }
+       Path newPath = new Path(DONE, path.getName());
+       LOG.info("Moving completed job from " + path + " to " + newPath);
+       JT_FS.rename(path, newPath);
+
+       Path confPath = fileManager.getConfFileWriters(id);
+       if (confPath == null) {
+         LOG.info("No file for jobconf with " + id + " found in cache!");
+         return;
+       }
+       // move the conf too
+       newPath = new Path(DONE, confPath.getName());
+       LOG.info("Moving configuration of completed job from " + confPath 
+                + " to " + newPath);
+       JT_FS.rename(confPath, newPath);
+
+       // purge the job from the cache
+       fileManager.purgeJob(id);
+     }
+
+     /**
      * Log job submitted event to history. Creates a new file in history 
      * for the job. if history file creation fails, it disables history 
      * for all other events. 
@@ -898,7 +1019,12 @@
           if (logFileName == null) {
             logFileName =
               encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
-            
+          } else {
+            String parts[] = logFileName.split("_");
+            //TODO this is a hack :(
+            // jobtracker-hostname_jobtracker-identifier_
+            String jtUniqueString = parts[0] + "_" + parts[1] + "_";
+            jobUniqueString = jtUniqueString + jobId.toString();
           }
         } else {
           logFileName = 
@@ -913,7 +1039,6 @@
           getJobHistoryLogLocationForUser(logFileName, jobConf);
 
         try{
-          ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
           FSDataOutputStream out = null;
           PrintWriter writer = null;
 
@@ -933,7 +1058,10 @@
                             JT_FS.getDefaultReplication(), 
                             jobHistoryBlockSize, null);
             writer = new PrintWriter(out);
-            writers.add(writer);
+            fileManager.addWriter(jobId, writer);
+
+            // cache it ...
+            fileManager.setHistoryFile(jobId, logFile);
           }
           if (userLogFile != null) {
             // Get the actual filename as recoverJobHistoryFile() might return
@@ -947,11 +1075,10 @@
  
             out = fs.create(userLogFile, true, 4096);
             writer = new PrintWriter(out);
-            writers.add(writer);
+            fileManager.addWriter(jobId, writer);
           }
-
-          openJobs.put(jobUniqueString, writers);
           
+          ArrayList<PrintWriter> writers = fileManager.getWriters(jobId);
           // Log the history meta info
           JobHistory.MetaInfoManager.logMetaInfo(writers);
 
@@ -996,6 +1123,7 @@
       if (LOG_DIR != null) {
         jobFilePath = new Path(LOG_DIR + File.separator + 
                                jobUniqueString + "_conf.xml");
+        fileManager.setConfFile(jobId, jobFilePath);
       }
       Path userJobFilePath = null;
       if (userLogDir != null) {
@@ -1027,7 +1155,7 @@
                     + jobFilePath + "and" + userJobFilePath );
         }
       } catch (IOException ioe) {
-        LOG.error("Failed to store job conf on the local filesystem ", ioe);
+        LOG.error("Failed to store job conf in the log dir", ioe);
       } finally {
         if (jobFileOut != null) {
           try {
@@ -1050,8 +1178,7 @@
     public static void logInited(JobID jobId, long startTime, 
                                  int totalMaps, int totalReduces) {
       if (!disableHistory){
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job, 
@@ -1087,8 +1214,7 @@
      */
     public static void logStarted(JobID jobId){
       if (!disableHistory){
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job, 
@@ -1115,8 +1241,7 @@
                                    Counters counters){
       if (!disableHistory){
         // close job file for this job
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,          
@@ -1135,7 +1260,6 @@
           for (PrintWriter out : writer) {
             out.close();
           }
-          openJobs.remove(logFileKey); 
         }
         Thread historyCleaner  = new Thread(new HistoryCleaner());
         historyCleaner.start(); 
@@ -1150,8 +1274,7 @@
      */
     public static void logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces){
       if (!disableHistory){
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,
@@ -1161,7 +1284,6 @@
           for (PrintWriter out : writer) {
             out.close();
           }
-          openJobs.remove(logFileKey); 
         }
       }
     }
@@ -1180,8 +1302,7 @@
     public static void logKilled(JobID jobid, long timestamp, int finishedMaps,
         int finishedReduces) {
       if (!disableHistory) {
-        String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
 
         if (null != writer) {
           JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID,
@@ -1192,7 +1313,6 @@
           for (PrintWriter out : writer) {
             out.close();
           }
-          openJobs.remove(logFileKey);
         }
       }
     }
@@ -1203,8 +1323,7 @@
      */
     public static void logJobPriority(JobID jobid, JobPriority priority){
       if (!disableHistory){
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,
@@ -1229,8 +1348,7 @@
     public static void logJobInfo(JobID jobid, long submitTime, long launchTime)
     {
       if (!disableHistory){
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,
@@ -1261,8 +1379,8 @@
     public static void logStarted(TaskID taskId, String taskType, 
                                   long startTime, String splitLocations) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskId.getJobID()); 
+        JobID id = taskId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
@@ -1283,8 +1401,8 @@
     public static void logFinished(TaskID taskId, String taskType, 
                                    long finishTime, Counters counters){
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskId.getJobID()); 
+        JobID id = taskId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id);
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
@@ -1305,8 +1423,8 @@
      */
     public static void logUpdates(TaskID taskId, long finishTime){
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskId.getJobID()); 
+        JobID id = taskId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
@@ -1335,8 +1453,8 @@
                                  String error, 
                                  TaskAttemptID failedDueToAttempt){
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskId.getJobID()); 
+        JobID id = taskId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           String failedAttempt = failedDueToAttempt == null
@@ -1397,8 +1515,8 @@
                                   String trackerName, int httpPort, 
                                   String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
@@ -1474,8 +1592,8 @@
                                    String stateString, 
                                    Counters counter) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
@@ -1525,8 +1643,8 @@
                                  long timestamp, String hostName, 
                                  String error, String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
@@ -1571,8 +1689,8 @@
                                  long timestamp, String hostName,
                                  String error, String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
@@ -1623,8 +1741,8 @@
                                   int httpPort, 
                                   String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -1678,8 +1796,8 @@
                                    String hostName, String taskType,
                                    String stateString, Counters counter) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -1729,8 +1847,8 @@
                                  String hostName, String error, 
                                  String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -1775,8 +1893,8 @@
                                  String hostName, String error, 
                                  String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -1838,8 +1956,7 @@
       lastRan = now;  
       isRunning = true; 
       try {
-        Path logDir = new Path(LOG_DIR);
-        FileStatus[] historyFiles = JT_FS.listStatus(logDir);
+        FileStatus[] historyFiles = JT_FS.listStatus(DONE);
         // delete if older than 30 days
         if (historyFiles != null) {
           for (FileStatus f : historyFiles) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=789316&r1=789315&r2=789316&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jun 29 13:21:31
2009
@@ -1684,7 +1684,7 @@
     String historyLogDir = null;
     FileSystem historyFS = null;
     if (historyInitialized) {
-      historyLogDir = conf.get("hadoop.job.history.location");
+      historyLogDir = JobHistory.getCompletedJobHistoryLocation().toString();
       infoServer.setAttribute("historyLogDir", historyLogDir);
       historyFS = new Path(historyLogDir).getFileSystem(conf);
       infoServer.setAttribute("fileSys", historyFS);
@@ -2174,6 +2174,13 @@
       }
     }
 
+    // mark the job as completed
+    try {
+      JobHistory.JobInfo.markCompleted(id);
+    } catch (IOException ioe) {
+      LOG.info("Failed to mark job " + id + " as completed!", ioe);
+    }
+
     final JobTrackerInstrumentation metrics = getInstrumentation();
     metrics.finalizeJob(conf, id);
     

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=789316&r1=789315&r2=789316&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Mon
Jun 29 13:21:31 2009
@@ -34,6 +34,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobHistory.*;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -433,6 +434,20 @@
   }
 
   /**
+   * Returns the conf file name in the same
+   * @param path path of the jobhistory file
+   * @param running whether the job is running or completed
+   */
+  private static Path getPathForConf(Path path, Path dir) {
+    String parts[] = path.getName().split("_");
+    //TODO this is a hack :(
+    // jobtracker-hostname_jobtracker-identifier_
+    String id = parts[2] + "_" + parts[3] + "_" + parts[4];
+    String jobUniqueString = parts[0] + "_" + parts[1] + "_" +  id;
+    return new Path(dir, jobUniqueString + "_conf.xml");
+  }
+
+  /**
    *  Validates the format of contents of history file
    *  (1) history file exists and in correct location
    *  (2) Verify if the history file is parsable
@@ -458,10 +473,12 @@
                  String status, boolean splitsCanBeEmpty) throws IOException  {
 
     // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+    Path dir = JobHistory.getCompletedJobHistoryLocation();
+    String logFileName = 
+      JobHistory.JobInfo.getJobHistoryFileName(conf, id, dir);
 
     // Framework history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+    Path logFile = new Path(dir, logFileName);
     FileSystem fileSys = logFile.getFileSystem(conf);
  
     // Check if the history file exists
@@ -753,11 +770,13 @@
                               RunningJob job, JobConf conf) throws IOException  {
 
     JobID id = job.getID();
+    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
     // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
+                                                                  doneDir);
 
     // Framework history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+    Path logFile = new Path(doneDir, logFileName);
     FileSystem fileSys = logFile.getFileSystem(conf);
  
     // Check if the history file exists
@@ -813,6 +832,43 @@
 
       // Run a job that will be succeeded and validate its history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
+      
+      Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+      JobID id = job.getID();
+      String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
+                                                                    doneDir);
+      // Framework history log file location
+      Path logFile = new Path(doneDir, logFileName);
+      FileSystem fileSys = logFile.getFileSystem(conf);
+   
+      // Check if the history file exists
+      assertTrue("History file does not exist", fileSys.exists(logFile));
+
+      // check if the corresponding conf file exists
+      Path confFile = getPathForConf(logFile, doneDir);
+      assertTrue("Config for completed jobs doesnt exist", 
+                 fileSys.exists(confFile));
+
+      // check if the file exists in a done folder
+      assertTrue("Completed job config doesnt exist in the done folder", 
+                 "done".equals(confFile.getParent().getName()));
+
+      // check if the file exists in a done folder
+      assertTrue("Completed jobs doesnt exist in the done folder", 
+                 "done".equals(logFile.getParent().getName()));
+      
+
+      // check if the job file is removed from the history location 
+      Path runningJobsHistoryFolder = logFile.getParent().getParent();
+      Path runningJobHistoryFilename = 
+        new Path(runningJobsHistoryFolder, logFile.getName());
+      Path runningJobConfFilename = 
+        new Path(runningJobsHistoryFolder, confFile.getName());
+      assertFalse("History file not deleted from the running folder", 
+                  fileSys.exists(runningJobHistoryFilename));
+      assertFalse("Config for completed jobs not deleted from running folder", 
+                  fileSys.exists(runningJobConfFilename));
+
       validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
       validateJobHistoryFileContent(mr, job, conf);
 
@@ -852,7 +908,9 @@
   private static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) 
           throws IOException  {
     // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
+                                                                  doneDir);
 
     // User history log file location
     Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
@@ -962,10 +1020,12 @@
           String status) throws IOException  {
 
     // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
+                                                                  doneDir);
 
     // Framework history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+    Path logFile = new Path(doneDir, logFileName);
     FileSystem fileSys = logFile.getFileSystem(conf);
  
     // Check if the history file exists

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=789316&r1=789315&r2=789316&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Mon Jun 29 13:21:31 2009
@@ -317,7 +317,9 @@
   private void testJobHistoryFiles(JobID id, JobConf conf) 
   throws IOException  {
     // Get the history files for users
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+    Path dir = JobHistory.getCompletedJobHistoryLocation();
+    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
+                                                                  dir);
     String tempLogFileName = 
       JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName);
     
@@ -337,7 +339,7 @@
     
     // II. Framework files
     // Get the history file
-    logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+    logFile = new Path(dir, logFileName);
     fileSys = logFile.getFileSystem(conf);
     
     // Check if the history file exists



Mime
View raw message