hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1076948 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/
Date Fri, 04 Mar 2011 03:24:59 GMT
Author: omalley
Date: Fri Mar  4 03:24:59 2011
New Revision: 1076948

URL: http://svn.apache.org/viewvc?rev=1076948&view=rev
Log:
commit a9d789e37205a8b111b98030d82aa722603fc441
Author: Lee Tucker <ltucker@yahoo-inc.com>
Date:   Thu Jul 30 17:40:31 2009 -0700

    Applying patch 2808668.mr416.patch

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1076948&r1=1076947&r2=1076948&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
Fri Mar  4 03:24:59 2011
@@ -92,17 +92,79 @@ public class JobHistory {
   public static final int JOB_NAME_TRIM_LENGTH = 50;
   private static String JOBTRACKER_UNIQUE_STRING = null;
   private static String LOG_DIR = null;
-  private static Map<String, ArrayList<PrintWriter>> openJobs = 
-                     new ConcurrentHashMap<String, ArrayList<PrintWriter>>();
   private static boolean disableHistory = false; 
   private static final String SECONDARY_FILE_SUFFIX = ".recover";
   private static long jobHistoryBlockSize = 0;
   private static String jobtrackerHostname;
+  private static JobHistoryFilesManager fileManager = 
+    new JobHistoryFilesManager();
   final static FsPermission HISTORY_DIR_PERMISSION =
     FsPermission.createImmutable((short) 0750); // rwxr-x---
   final static FsPermission HISTORY_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0740); // rwxr-----
   private static JobConf jtConf;
+  private static Path DONE = null; // folder for completed jobs
+  /**
+   * A class that manages all the files related to a job. For now 
+   *   - writers : list of open files
+   *   - job history filename
+   *   - job conf filename
+   */
+  private static class JobHistoryFilesManager {
+    // a private (virtual) folder for all the files related to a running job
+    private static class FilesHolder {
+      ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
+      Path historyFilename; // path of job history file
+      Path confFilename; // path of job's conf
+    }
+    
+   // cache from job-key to files associated with it.
+    private Map<JobID, FilesHolder> fileCache = 
+      new ConcurrentHashMap<JobID, FilesHolder>();
+
+    private FilesHolder getFileHolder(JobID id) {
+      FilesHolder holder = fileCache.get(id);
+     if (holder == null) {
+         holder = new FilesHolder();
+         fileCache.put(id, holder);
+      }
+      return holder;
+    }
+
+    void addWriter(JobID id, PrintWriter writer) {
+      FilesHolder holder = getFileHolder(id);
+      holder.writers.add(writer);
+    }
+
+    void setHistoryFile(JobID id, Path file) {
+      FilesHolder holder = getFileHolder(id);
+      holder.historyFilename = file;
+    }
+
+    void setConfFile(JobID id, Path file) {
+      FilesHolder holder = getFileHolder(id);
+      holder.confFilename = file;
+    }
+
+    ArrayList<PrintWriter> getWriters(JobID id) {
+      FilesHolder holder = fileCache.get(id);
+      return holder == null ? null : holder.writers;
+    }
+
+    Path getHistoryFile(JobID id) {
+      FilesHolder holder = fileCache.get(id);
+      return holder == null ? null : holder.historyFilename;
+    }
+
+    Path getConfFileWriters(JobID id) {
+      FilesHolder holder = fileCache.get(id);
+      return holder == null ? null : holder.confFilename;
+    }
+
+    void purgeJob(JobID id) {
+      fileCache.remove(id);
+    }
+  }
   /**
    * Record types are identifiers for each line of log in history files. 
    * A record type appears as the first token in a single line of log. 
@@ -152,6 +214,7 @@ public class JobHistory {
         "file:///" + new File(
         System.getProperty("hadoop.log.dir")).getAbsolutePath()
         + File.separator + "history");
+      DONE = new Path(LOG_DIR, "done");
       JOBTRACKER_UNIQUE_STRING = hostname + "_" + 
                                     String.valueOf(jobTrackerStartTime) + "_";
       jobtrackerHostname = hostname;
@@ -169,6 +232,9 @@ public class JobHistory {
         conf.getLong("mapred.jobtracker.job.history.block.size", 
                      3 * 1024 * 1024);
       jtConf = conf;
+
+      // create the done folder with appropriate permission
+      fs.mkdirs(DONE, HISTORY_DIR_PERMISSION);
     } catch(IOException e) {
         LOG.error("Failed to initialize JobHistory log file", e); 
         disableHistory = true;
@@ -388,6 +454,13 @@ public class JobHistory {
   } 
   
   /**
+   * Get the history location for completed jobs
+   */
+  static Path getCompletedJobHistoryLocation() {
+    return DONE;
+  }
+  
+  /**
    * Base class contais utility stuff to manage types key value pairs with enums. 
    */
   static class KeyValuePair{
@@ -639,6 +712,16 @@ public class JobHistory {
      */
     public static synchronized String getJobHistoryFileName(JobConf jobConf, 
                                                             JobID id) 
+    throws IOException {                    
+      return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR));
+    }
+
+    /**
+     * @param dir The directory where to search.
+     */
+    static synchronized String getJobHistoryFileName(JobConf jobConf, 
+                                                            JobID id, 
+                                                            Path dir) 
     throws IOException {
       String user = getUserName(jobConf);
       String jobName = trimJobName(getJobName(jobConf));
@@ -672,26 +755,33 @@ public class JobHistory {
         }
       };
       
-      FileStatus[] statuses = fs.listStatus(new Path(LOG_DIR), filter);
+      FileStatus[] statuses = fs.listStatus(dir, filter);
       String filename = null;
       if (statuses.length == 0) {
         LOG.info("Nothing to recover for job " + id);
       } else {
         // return filename considering that fact the name can be a 
         // secondary filename like filename.recover
-        filename = decodeJobHistoryFileName(statuses[0].getPath().getName());
-        // Remove the '.recover' suffix if it exists
-        if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
-          int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
-          filename = filename.substring(0, newLength);
-        }
-        filename = encodeJobHistoryFileName(filename);
+        filename = getPrimaryFilename(statuses[0].getPath().getName(), jobName);
         LOG.info("Recovered job history filename for job " + id + " is " 
                  + filename);
       }
       return filename;
     }
     
+    // removes all extra extensions from a filename and returns the core/primary
+    // filename
+    private static String getPrimaryFilename(String filename, String jobName) 
+    throws IOException{
+      filename = decodeJobHistoryFileName(filename);
+      // Remove the '.recover' suffix if it exists
+      if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
+        int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
+        filename = filename.substring(0, newLength);
+      }
+      return encodeJobHistoryFileName(filename);
+    }    
+
     /** Since there was a restart, there should be a master file and 
      * a recovery file. Once the recovery is complete, the master should be 
      * deleted as an indication that the recovery file should be treated as the 
@@ -788,33 +878,33 @@ public class JobHistory {
 
     /** Finalize the recovery and make one file in the end. 
      * This invloves renaming the recover file to the master file.
+     * Note that this api should be invoked only if recovery is involved.
      * @param id Job id  
      * @param conf the job conf
      * @throws IOException
      */
-    static synchronized void finalizeRecovery(JobID id, JobConf conf) 
+    static synchronized void finalizeRecovery(JobID id, JobConf conf)
     throws IOException {
-      String masterLogFileName = 
-        JobHistory.JobInfo.getJobHistoryFileName(conf, id);
-      if (masterLogFileName == null) {
-        return;
-      }
-      Path masterLogPath = 
-        JobHistory.JobInfo.getJobHistoryLogLocation(masterLogFileName);
-      String tmpLogFileName = getSecondaryJobHistoryFile(masterLogFileName);
-      Path tmpLogPath = 
-        JobHistory.JobInfo.getJobHistoryLogLocation(tmpLogFileName);
-      if (masterLogPath != null) {
-        FileSystem fs = masterLogPath.getFileSystem(conf);
+       Path tmpLogPath = fileManager.getHistoryFile(id);
+       if (tmpLogPath == null) {
+         LOG.debug("No file for job with " + id + " found in cache!");
+         return;
+       }
+       String tmpLogFileName = tmpLogPath.getName();
+       
+       // get the primary filename from the cached filename
+       String masterLogFileName = 
+         getPrimaryFilename(tmpLogFileName, getJobName(conf));
+       Path masterLogPath = new Path(tmpLogPath.getParent(), masterLogFileName);
+       
+       // rename the tmp file to the master file. Note that this should be 
+       // done only when the file is closed and handles are released.
+       LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
+       FileSystem fs = tmpLogPath.getFileSystem(jtConf);
+       fs.rename(tmpLogPath, masterLogPath);
+       // update the cache
+       fileManager.setHistoryFile(id, masterLogPath);
 
-        // rename the tmp file to the master file. Note that this should be 
-        // done only when the file is closed and handles are released.
-        if(fs.exists(tmpLogPath)) {
-          LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
-          fs.rename(tmpLogPath, masterLogPath);
-        }
-      }
-      
       // do the same for the user file too
       masterLogPath = 
         JobHistory.JobInfo.getJobHistoryLogLocationForUser(masterLogFileName,
@@ -823,7 +913,7 @@ public class JobHistory {
         JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName, 
                                                            conf);
       if (masterLogPath != null) {
-        FileSystem fs = masterLogPath.getFileSystem(conf);
+        fs = masterLogPath.getFileSystem(conf);
         if (fs.exists(tmpLogPath)) {
           LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName
                    + " in user directory");
@@ -846,6 +936,38 @@ public class JobHistory {
     }
 
     /**
+     * Move the completed job into the completed folder.
+     * This assumes that the jobhistory file is closed and all operations on the
+     * jobhistory file is complete.
+     * This *should* be the last call to jobhistory for a given job.
+     */
+     static void markCompleted(JobID id) throws IOException {
+       Path path = fileManager.getHistoryFile(id);
+       if (path == null) {
+         LOG.info("No file for job-history with " + id + " found in cache!");
+         return;
+       }
+       Path newPath = new Path(DONE, path.getName());
+       LOG.info("Moving completed job from " + path + " to " + newPath);
+       FileSystem fs = path.getFileSystem(jtConf);
+       fs.rename(path, newPath);
+
+       Path confPath = fileManager.getConfFileWriters(id);
+       if (confPath == null) {
+         LOG.info("No file for jobconf with " + id + " found in cache!");
+         return;
+       }
+       // move the conf too
+       newPath = new Path(DONE, confPath.getName());
+       LOG.info("Moving configuration of completed job from " + confPath 
+                + " to " + newPath);
+       fs.rename(confPath, newPath);
+
+       // purge the job from the cache
+       fileManager.purgeJob(id);
+     }
+
+     /**
      * Log job submitted event to history. Creates a new file in history 
      * for the job. if history file creation fails, it disables history 
      * for all other events. 
@@ -885,7 +1007,12 @@ public class JobHistory {
           if (logFileName == null) {
             logFileName =
               encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
-            
+          } else {
+            String parts[] = logFileName.split("_");
+            //TODO this is a hack :(
+            // jobtracker-hostname_jobtracker-identifier_
+            String jtUniqueString = parts[0] + "_" + parts[1] + "_";
+            jobUniqueString = jtUniqueString + jobId.toString();
           }
         } else {
           logFileName = 
@@ -900,7 +1027,6 @@ public class JobHistory {
           getJobHistoryLogLocationForUser(logFileName, jobConf);
 
         try{
-          ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
           FSDataOutputStream out = null;
           PrintWriter writer = null;
 
@@ -922,7 +1048,10 @@ public class JobHistory {
                             fs.getDefaultReplication(), 
                             jobHistoryBlockSize, null);
             writer = new PrintWriter(out);
-            writers.add(writer);
+            fileManager.addWriter(jobId, writer);
+
+            // cache it ...
+            fileManager.setHistoryFile(jobId, logFile);
           }
           if (userLogFile != null) {
             // Get the actual filename as recoverJobHistoryFile() might return
@@ -936,11 +1065,10 @@ public class JobHistory {
  
             out = fs.create(userLogFile, true, 4096);
             writer = new PrintWriter(out);
-            writers.add(writer);
+            fileManager.addWriter(jobId, writer);
           }
-
-          openJobs.put(jobUniqueString, writers);
           
+          ArrayList<PrintWriter> writers = fileManager.getWriters(jobId);
           // Log the history meta info
           JobHistory.MetaInfoManager.logMetaInfo(writers);
 
@@ -985,6 +1113,7 @@ public class JobHistory {
       if (LOG_DIR != null) {
         jobFilePath = new Path(LOG_DIR + File.separator + 
                                jobUniqueString + "_conf.xml");
+        fileManager.setConfFile(jobId, jobFilePath);
       }
       Path userJobFilePath = null;
       if (userLogDir != null) {
@@ -1018,7 +1147,7 @@ public class JobHistory {
                     + jobFilePath + "and" + userJobFilePath );
         }
       } catch (IOException ioe) {
-        LOG.error("Failed to store job conf on the local filesystem ", ioe);
+        LOG.error("Failed to store job conf in the log dir", ioe);
       } finally {
         if (jobFileOut != null) {
           try {
@@ -1041,8 +1170,7 @@ public class JobHistory {
     public static void logInited(JobID jobId, long startTime, 
                                  int totalMaps, int totalReduces) {
       if (!disableHistory){
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job, 
@@ -1078,8 +1206,7 @@ public class JobHistory {
      */
     public static void logStarted(JobID jobId){
       if (!disableHistory){
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job, 
@@ -1106,8 +1233,7 @@ public class JobHistory {
                                    Counters counters){
       if (!disableHistory){
         // close job file for this job
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,          
@@ -1126,7 +1252,6 @@ public class JobHistory {
           for (PrintWriter out : writer) {
             out.close();
           }
-          openJobs.remove(logFileKey); 
         }
         Thread historyCleaner  = new Thread(new HistoryCleaner());
         historyCleaner.start(); 
@@ -1141,8 +1266,7 @@ public class JobHistory {
      */
     public static void logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces){
       if (!disableHistory){
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,
@@ -1152,7 +1276,6 @@ public class JobHistory {
           for (PrintWriter out : writer) {
             out.close();
           }
-          openJobs.remove(logFileKey); 
         }
       }
     }
@@ -1171,8 +1294,7 @@ public class JobHistory {
     public static void logKilled(JobID jobid, long timestamp, int finishedMaps,
         int finishedReduces) {
       if (!disableHistory) {
-        String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
 
         if (null != writer) {
           JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID,
@@ -1183,7 +1305,6 @@ public class JobHistory {
           for (PrintWriter out : writer) {
             out.close();
           }
-          openJobs.remove(logFileKey);
         }
       }
     }
@@ -1194,8 +1315,7 @@ public class JobHistory {
      */
     public static void logJobPriority(JobID jobid, JobPriority priority){
       if (!disableHistory){
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,
@@ -1220,8 +1340,7 @@ public class JobHistory {
     public static void logJobInfo(JobID jobid, long submitTime, long launchTime)
     {
       if (!disableHistory){
-        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
-        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,
@@ -1252,8 +1371,8 @@ public class JobHistory {
     public static void logStarted(TaskID taskId, String taskType, 
                                   long startTime, String splitLocations) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskId.getJobID()); 
+        JobID id = taskId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
@@ -1274,8 +1393,8 @@ public class JobHistory {
     public static void logFinished(TaskID taskId, String taskType, 
                                    long finishTime, Counters counters){
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskId.getJobID()); 
+        JobID id = taskId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id);
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
@@ -1296,8 +1415,8 @@ public class JobHistory {
      */
     public static void logUpdates(TaskID taskId, long finishTime){
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskId.getJobID()); 
+        JobID id = taskId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
@@ -1326,8 +1445,8 @@ public class JobHistory {
                                  String error, 
                                  TaskAttemptID failedDueToAttempt){
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskId.getJobID()); 
+        JobID id = taskId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           String failedAttempt = failedDueToAttempt == null
@@ -1388,8 +1507,8 @@ public class JobHistory {
                                   String trackerName, int httpPort, 
                                   String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
@@ -1438,8 +1557,8 @@ public class JobHistory {
                                    String stateString, 
                                    Counters counter) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
@@ -1487,8 +1606,8 @@ public class JobHistory {
                                  long timestamp, String hostName, 
                                  String error, String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
@@ -1533,8 +1652,8 @@ public class JobHistory {
                                  long timestamp, String hostName,
                                  String error, String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
@@ -1585,8 +1704,8 @@ public class JobHistory {
                                   int httpPort, 
                                   String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -1640,8 +1759,8 @@ public class JobHistory {
                                    String hostName, String taskType,
                                    String stateString, Counters counter) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -1691,8 +1810,8 @@ public class JobHistory {
                                  String hostName, String error, 
                                  String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -1737,8 +1856,8 @@ public class JobHistory {
                                  String hostName, String error, 
                                  String taskType) {
       if (!disableHistory){
-        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                   + taskAttemptId.getJobID()); 
+        JobID id = taskAttemptId.getJobID();
+        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -1800,9 +1919,8 @@ public class JobHistory {
       lastRan = now;  
       isRunning = true; 
       try {
-        Path logDir = new Path(LOG_DIR);
-        FileSystem fs = logDir.getFileSystem(jtConf);
-        FileStatus[] historyFiles = fs.listStatus(logDir);
+        FileSystem fs = DONE.getFileSystem(jtConf);
+        FileStatus[] historyFiles = fs.listStatus(DONE);
         // delete if older than 30 days
         if (historyFiles != null) {
           for (FileStatus f : historyFiles) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076948&r1=1076947&r2=1076948&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar  4 03:24:59 2011
@@ -1718,7 +1718,7 @@ public class JobTracker implements MRCon
     String historyLogDir = null;
     FileSystem historyFS = null;
     if (historyInitialized) {
-      historyLogDir = conf.get("hadoop.job.history.location");
+      historyLogDir = JobHistory.getCompletedJobHistoryLocation().toString();
       infoServer.setAttribute("historyLogDir", historyLogDir);
       historyFS = new Path(historyLogDir).getFileSystem(conf);
       infoServer.setAttribute("fileSys", historyFS);
@@ -2197,6 +2197,13 @@ public class JobTracker implements MRCon
       }
     }
 
+    // mark the job as completed
+    try {
+      JobHistory.JobInfo.markCompleted(id);
+    } catch (IOException ioe) {
+      LOG.info("Failed to mark job " + id + " as completed!", ioe);
+    }
+
     final JobTrackerInstrumentation metrics = getInstrumentation();
     metrics.finalizeJob(conf, id);
     

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1076948&r1=1076947&r2=1076948&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
Fri Mar  4 03:24:59 2011
@@ -34,6 +34,7 @@ import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobHistory.*;
 import org.apache.commons.logging.Log;
@@ -423,6 +424,20 @@ public class TestJobHistory extends Test
   }
 
   /**
+   * Returns the conf file name in the same
+   * @param path path of the jobhistory file
+   * @param running whether the job is running or completed
+   */
+  private static Path getPathForConf(Path path, Path dir) {
+    String parts[] = path.getName().split("_");
+    //TODO this is a hack :(
+    // jobtracker-hostname_jobtracker-identifier_
+    String id = parts[2] + "_" + parts[3] + "_" + parts[4];
+    String jobUniqueString = parts[0] + "_" + parts[1] + "_" +  id;
+    return new Path(dir, jobUniqueString + "_conf.xml");
+  }
+
+  /**
    *  Validates the format of contents of history file
    *  (1) history file exists and in correct location
    *  (2) Verify if the history file is parsable
@@ -448,10 +463,12 @@ public class TestJobHistory extends Test
                  String status, boolean splitsCanBeEmpty) throws IOException  {
 
     // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+    Path dir = JobHistory.getCompletedJobHistoryLocation();
+    String logFileName = 
+      JobHistory.JobInfo.getJobHistoryFileName(conf, id, dir);
 
     // Framework history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+    Path logFile = new Path(dir, logFileName);
     FileSystem fileSys = logFile.getFileSystem(conf);
  
     // Check if the history file exists
@@ -743,11 +760,13 @@ public class TestJobHistory extends Test
                               RunningJob job, JobConf conf) throws IOException  {
 
     JobID id = job.getID();
+    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
     // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
+                                                                  doneDir);
 
     // Framework history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+    Path logFile = new Path(doneDir, logFileName);
     FileSystem fileSys = logFile.getFileSystem(conf);
  
     // Check if the history file exists
@@ -803,6 +822,43 @@ public class TestJobHistory extends Test
 
       // Run a job that will be succeeded and validate its history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
+      
+      Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+      JobID id = job.getID();
+      String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
+                                                                    doneDir);
+      // Framework history log file location
+      Path logFile = new Path(doneDir, logFileName);
+      FileSystem fileSys = logFile.getFileSystem(conf);
+   
+      // Check if the history file exists
+      assertTrue("History file does not exist", fileSys.exists(logFile));
+
+      // check if the corresponding conf file exists
+      Path confFile = getPathForConf(logFile, doneDir);
+      assertTrue("Config for completed jobs doesnt exist", 
+                 fileSys.exists(confFile));
+
+      // check if the file exists in a done folder
+      assertTrue("Completed job config doesnt exist in the done folder", 
+                 "done".equals(confFile.getParent().getName()));
+
+      // check if the file exists in a done folder
+      assertTrue("Completed jobs doesnt exist in the done folder", 
+                 "done".equals(logFile.getParent().getName()));
+      
+
+      // check if the job file is removed from the history location 
+      Path runningJobsHistoryFolder = logFile.getParent().getParent();
+      Path runningJobHistoryFilename = 
+        new Path(runningJobsHistoryFolder, logFile.getName());
+      Path runningJobConfFilename = 
+        new Path(runningJobsHistoryFolder, confFile.getName());
+      assertFalse("History file not deleted from the running folder", 
+                  fileSys.exists(runningJobHistoryFilename));
+      assertFalse("Config for completed jobs not deleted from running folder", 
+                  fileSys.exists(runningJobConfFilename));
+
       validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
       validateJobHistoryFileContent(mr, job, conf);
 
@@ -842,7 +898,9 @@ public class TestJobHistory extends Test
   private static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) 
           throws IOException  {
     // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
+                                                                  doneDir);
 
     // User history log file location
     Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
@@ -952,10 +1010,12 @@ public class TestJobHistory extends Test
           String status) throws IOException  {
 
     // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
+                                                                  doneDir);
 
     // Framework history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+    Path logFile = new Path(doneDir, logFileName);
     FileSystem fileSys = logFile.getFileSystem(conf);
  
     // Check if the history file exists

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=1076948&r1=1076947&r2=1076948&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Fri Mar  4 03:24:59 2011
@@ -329,7 +329,9 @@ public class TestJobTrackerRestart exten
   private void testJobHistoryFiles(JobID id, JobConf conf) 
   throws IOException  {
     // Get the history files for users
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+    Path dir = JobHistory.getCompletedJobHistoryLocation();
+    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
+                                                                  dir);
     String tempLogFileName = 
       JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName);
     
@@ -349,7 +351,7 @@ public class TestJobTrackerRestart exten
     
     // II. Framework files
     // Get the history file
-    logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+    logFile = new Path(dir, logFileName);
     fileSys = logFile.getFileSystem(conf);
     
     // Check if the history file exists



Mime
View raw message