hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r772884 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobHistory.java src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/mapred/org/apache/hadoop/mapred/JobTracker.java
Date Fri, 08 May 2009 08:33:13 GMT
Author: ddas
Date: Fri May  8 08:33:13 2009
New Revision: 772884

URL: http://svn.apache.org/viewvc?rev=772884&view=rev
Log:
HADOOP-4372. Improves the way history filenames are obtained and manipulated. Contributed
by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=772884&r1=772883&r2=772884&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri May  8 08:33:13 2009
@@ -320,6 +320,9 @@
     HADOOP-5080. Add new test cases to TestMRCLI and TestHDFSCLI
     (V.Karthikeyan via nigel)
 
+    HADOOP-4372. Improves the way history filenames are obtained and manipulated.
+    (Amar Kamat via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=772884&r1=772883&r2=772884&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri May  8 08:33:13
2009
@@ -665,12 +665,9 @@
       };
       
       FileStatus[] statuses = fs.listStatus(new Path(LOG_DIR), filter);
-      String filename;
+      String filename = null;
       if (statuses.length == 0) {
-        filename = 
-          encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, id));
-        LOG.info("Nothing to recover! Generating a new filename " + filename 
-                 + " for job " + id);
+        LOG.info("Nothing to recover for job " + id);
       } else {
         // return filename considering that fact the name can be a 
         // secondary filename like filename.recover
@@ -791,6 +788,9 @@
     throws IOException {
       String masterLogFileName = 
         JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+      if (masterLogFileName == null) {
+        return;
+      }
       Path masterLogPath = 
         JobHistory.JobInfo.getJobHistoryLogLocation(masterLogFileName);
       String tmpLogFileName = getSecondaryJobHistoryFile(masterLogFileName);
@@ -833,10 +833,19 @@
      * @param jobConfPath path to job conf xml file in HDFS.
      * @param submitTime time when job tracker received the job
      * @throws IOException
+     * @deprecated Use 
+     *     {@link #logSubmitted(JobID, JobConf, String, long, boolean)} instead.
      */
     public static void logSubmitted(JobID jobId, JobConf jobConf, 
                                     String jobConfPath, long submitTime) 
     throws IOException {
+      logSubmitted(jobId, jobConf, jobConfPath, submitTime, true);
+    }
+    
+    public static void logSubmitted(JobID jobId, JobConf jobConf, 
+                                    String jobConfPath, long submitTime, 
+                                    boolean restarted) 
+    throws IOException {
       FileSystem fs = null;
       String userLogDir = null;
       String jobUniqueString = JOBTRACKER_UNIQUE_STRING + jobId;
@@ -849,8 +858,13 @@
         String user = getUserName(jobConf);
         
         // get the history filename
-        String logFileName = 
-          getJobHistoryFileName(jobConf, jobId);
+        String logFileName = null;
+        if (restarted) {
+          logFileName = getJobHistoryFileName(jobConf, jobId);
+        } else {
+          logFileName = 
+            encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
+        }
 
         // setup the history log file for this job
         Path logFile = getJobHistoryLogLocation(logFileName);
@@ -868,8 +882,10 @@
             // create output stream for logging in hadoop.job.history.location
             fs = new Path(LOG_DIR).getFileSystem(jobConf);
             
-            logFile = recoverJobHistoryFile(jobConf, logFile);
-            logFileName = logFile.getName();
+            if (restarted) {
+              logFile = recoverJobHistoryFile(jobConf, logFile);
+              logFileName = logFile.getName();
+            }
             
             int defaultBufferSize = 
               fs.getConf().getInt("io.file.buffer.size", 4096);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=772884&r1=772883&r2=772884&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri May  8 08:33:13
2009
@@ -365,6 +365,10 @@
     return tasksInited.get();
   }
   
+  boolean hasRestarted() {
+    return restartCount > 0;
+  }
+
   /**
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
@@ -384,7 +388,7 @@
 
     // log job info
     JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
-                                    this.startTime);
+                                    this.startTime, hasRestarted());
     // log the job priority
     setPriority(this.priority);
     

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=772884&r1=772883&r2=772884&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri May  8 08:33:13
2009
@@ -1249,18 +1249,23 @@
           // 3. Get the log file and the file path
           String logFileName = 
             JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
-          Path jobHistoryFilePath = 
-            JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-
-          // 4. Recover the history file. This involved
-          //     - deleting file.recover if file exists
-          //     - renaming file.recover to file if file doesnt exist
-          // This makes sure that the (master) file exists
-          JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
-                                                   jobHistoryFilePath);
+          if (logFileName != null) {
+            Path jobHistoryFilePath = 
+              JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+
+            // 4. Recover the history file. This involved
+            //     - deleting file.recover if file exists
+            //     - renaming file.recover to file if file doesnt exist
+            // This makes sure that the (master) file exists
+            JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
+                                                     jobHistoryFilePath);
           
-          // 5. Cache the history file name as it costs one dfs access
-          jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+            // 5. Cache the history file name as it costs one dfs access
+            jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+          } else {
+            LOG.info("No history file found for job " + id);
+            idIter.remove(); // remove from recovery list
+          }
 
           // 6. Sumbit the job to the jobtracker
           addJob(id, job);
@@ -2045,10 +2050,12 @@
 
     // start the merge of log files
     JobID id = job.getStatus().getJobID();
-    try {
-      JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
-    } catch (IOException ioe) {
-      LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
+    if (job.hasRestarted()) {
+      try {
+        JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
+      } catch (IOException ioe) {
+        LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
+      }
     }
 
     final JobTrackerInstrumentation metrics = getInstrumentation();



Mime
View raw message