hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r788665 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Date Fri, 26 Jun 2009 12:03:23 GMT
Author: sharad
Date: Fri Jun 26 12:03:22 2009
New Revision: 788665

URL: http://svn.apache.org/viewvc?rev=788665&view=rev
Log:
MAPREDUCE-657. Fix hardcoded filesystem problem in CompletedJobStatusStore. Contributed by
Amar Kamat.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=788665&r1=788664&r2=788665&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jun 26 12:03:22 2009
@@ -48,3 +48,6 @@
 
     MAPREDUCE-130. Delete the jobconf copy from the log directory of the 
     JobTracker when the job is retired. (Amar Kamat via sharad)
+
+    MAPREDUCE-657. Fix hardcoded filesystem problem in CompletedJobStatusStore.
+    (Amar Kamat via sharad)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java?rev=788665&r1=788664&r2=788665&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
Fri Jun 26 12:03:22 2009
@@ -51,12 +51,11 @@
   private static long HOUR = 1000 * 60 * 60;
   private static long SLEEP_TIME = 1 * HOUR;
 
-  CompletedJobStatusStore(Configuration conf, FileSystem fs) throws IOException {
+  CompletedJobStatusStore(Configuration conf) throws IOException {
     active =
       conf.getBoolean("mapred.job.tracker.persist.jobstatus.active", false);
 
     if (active) {
-      this.fs = fs;
       retainTime =
         conf.getInt("mapred.job.tracker.persist.jobstatus.hours", 0) * HOUR;
 
@@ -64,6 +63,9 @@
         conf.get("mapred.job.tracker.persist.jobstatus.dir", JOB_INFO_STORE_DIR);
 
       Path path = new Path(jobInfoDir);
+      
+      // set the fs
+      this.fs = path.getFileSystem(conf);
       if (!fs.exists(path)) {
         fs.mkdirs(path);
       }
@@ -72,6 +74,10 @@
         // as retain time is zero, all stored jobstatuses are deleted.
         deleteJobStatusDirs();
       }
+      LOG.info("Completed job store activated/configured with retain-time : " 
+               + retainTime + " , job-info-dir : " + jobInfoDir);
+    } else {
+      LOG.info("Completed job store is inactive");
     }
   }
 
@@ -100,14 +106,13 @@
 
   private void deleteJobStatusDirs() {
     try {
-      long currentTime = System.currentTimeMillis();
-      FileStatus[] jobInfoFiles = fs.listStatus(
-              new Path[]{new Path(jobInfoDir)});
-
+      long currentTime = JobTracker.getClock().getTime();
       //noinspection ForLoopReplaceableByForEach
-      for (FileStatus jobInfo : jobInfoFiles) {
+      for (FileStatus jobInfo : fs.listStatus(new Path(jobInfoDir))) {
         try {
           if ((currentTime - jobInfo.getModificationTime()) > retainTime) {
+            LOG.info("Retiring job status from the store [" + jobInfo.getPath() 
+                     + "]");
             fs.delete(jobInfo.getPath(), true);
           }
         }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=788665&r1=788664&r2=788665&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Jun 26
12:03:22 2009
@@ -585,7 +585,7 @@
     }
         
     // set the launch time
-    this.launchTime = jobtracker.getClock().getTime();
+    this.launchTime = JobTracker.getClock().getTime();
 
     //
     // Create reduce tasks
@@ -1890,7 +1890,7 @@
     if (list.isEmpty()) {
       return null;
     }
-    long now = jobtracker.getClock().getTime();
+    long now = JobTracker.getClock().getTime();
     if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list)) {
       return null;
     }
@@ -2597,7 +2597,7 @@
       if (reduces.length == 0) {
         this.status.setReduceProgress(1.0f);
       }
-      this.finishTime = jobtracker.getClock().getTime();
+      this.finishTime = JobTracker.getClock().getTime();
       cancelReservedSlots();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
@@ -2620,7 +2620,7 @@
         this.status = new JobStatus(status.getJobID(),
                                     1.0f, 1.0f, 1.0f, JobStatus.FAILED,
                                     status.getJobPriority());
-        this.finishTime = jobtracker.getClock().getTime();
+        this.finishTime = JobTracker.getClock().getTime();
         JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks);
@@ -2628,7 +2628,7 @@
         this.status = new JobStatus(status.getJobID(),
                                     1.0f, 1.0f, 1.0f, JobStatus.KILLED,
                                     status.getJobPriority());
-        this.finishTime = jobtracker.getClock().getTime();
+        this.finishTime = JobTracker.getClock().getTime();
         JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks);
@@ -2985,10 +2985,10 @@
     // update the actual start-time of the attempt
     TaskStatus oldStatus = tip.getTaskStatus(taskid); 
     long startTime = oldStatus == null
-                     ? jobtracker.getClock().getTime()
+                     ? JobTracker.getClock().getTime()
                      : oldStatus.getStartTime();
     status.setStartTime(startTime);
-    status.setFinishTime(jobtracker.getClock().getTime());
+    status.setFinishTime(JobTracker.getClock().getTime());
     boolean wasComplete = tip.isComplete();
     updateTaskStatus(tip, status);
     boolean isComplete = tip.isComplete();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=788665&r1=788664&r2=788665&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Jun 26 12:03:22
2009
@@ -142,7 +142,7 @@
   final static FsPermission SYSTEM_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0700); // rwx------
   
-  private Clock clock;
+  private static Clock clock;
 
   /**
    * A client tried to submit a job before the Job Tracker was ready.
@@ -170,7 +170,7 @@
 
   public static final Log LOG = LogFactory.getLog(JobTracker.class);
     
-  public Clock getClock() {
+  static Clock getClock() {
     return clock;
   }
   
@@ -1794,7 +1794,7 @@
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
+    completedJobStatusStore = new CompletedJobStatusStore(conf);
   }
 
   private static SimpleDateFormat getDateFormat() {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=788665&r1=788664&r2=788665&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri Jun 26
12:03:22 2009
@@ -583,7 +583,7 @@
       taskStatuses.put(taskid, status);
       if ((isMapTask() && job.hasSpeculativeMaps()) || 
           (!isMapTask() && job.hasSpeculativeReduces())) {
-        long now = jobtracker.getClock().getTime();
+        long now = JobTracker.getClock().getTime();
         double oldProgRate = getOldProgressRate();
         double currProgRate = getCurrentProgressRate(now);
         job.updateStatistics(oldProgRate, currProgRate, isMapTask());
@@ -649,7 +649,7 @@
 
       // tasktracker went down and failed time was not reported. 
       if (0 == status.getFinishTime()){
-        status.setFinishTime(jobtracker.getClock().getTime());
+        status.setFinishTime(JobTracker.getClock().getTime());
       }
     }
 
@@ -753,7 +753,7 @@
     //
 
     this.completes++;
-    this.execFinishTime = jobtracker.getClock().getTime();
+    this.execFinishTime = JobTracker.getClock().getTime();
     recomputeProgress();
     
   }
@@ -792,7 +792,7 @@
     }
     this.failed = true;
     killed = true;
-    this.execFinishTime = jobtracker.getClock().getTime();
+    this.execFinishTime = JobTracker.getClock().getTime();
     recomputeProgress();
   }
 
@@ -939,7 +939,7 @@
 
     //keep track of the last time we started an attempt at this TIP
     //used to calculate the progress rate of this TIP
-    setDispatchTime(jobtracker.getClock().getTime());
+    setDispatchTime(JobTracker.getClock().getTime());
  
     //set this the first time we run a taskAttempt in this TIP
     //each Task attempt has its own TaskStatus, which tracks that

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java?rev=788665&r1=788664&r2=788665&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java
Fri Jun 26 12:03:22 2009
@@ -22,11 +22,16 @@
 import java.io.Writer;
 import java.util.Properties;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
 public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
+  static final Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data","/tmp"), 
+             "job-status-persistence");
+  
   private JobID runJob() throws Exception {
     OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
     Writer wr = new OutputStreamWriter(os);
@@ -103,4 +108,29 @@
     }
   }
 
+  /**
+   * Test if the completed job status is persisted to localfs.
+   */
+  public void testLocalPersistency() throws Exception {
+    FileSystem fs = FileSystem.getLocal(createJobConf());
+    
+    fs.delete(TEST_DIR, true);
+    
+    Properties config = new Properties();
+    config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true");
+    config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
+    config.setProperty("mapred.job.tracker.persist.jobstatus.dir", 
+                       fs.makeQualified(TEST_DIR).toString());
+    stopCluster();
+    startCluster(false, config);
+    JobID jobId = runJob();
+    JobClient jc = new JobClient(createJobConf());
+    RunningJob rj = jc.getJob(jobId);
+    assertNotNull(rj);
+    
+    // check if the local fs has the data
+    Path jobInfo = new Path(TEST_DIR, rj.getID() + ".info");
+    assertTrue("Missing job info from the local fs", fs.exists(jobInfo));
+    fs.delete(TEST_DIR, true);
+  }
 }



Mime
View raw message