hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r697286 - in /hadoop/core/trunk: ./ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapp...
Date Fri, 19 Sep 2008 23:45:18 GMT
Author: acmurthy
Date: Fri Sep 19 16:45:18 2008
New Revision: 697286

URL: http://svn.apache.org/viewvc?rev=697286&view=rev
Log:
HADOOP-3924. Added a 'KILLED' job status. Contributed by Subramaniam Krishnan.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/core/trunk/src/webapps/job/jobdetails.jsp

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 16:45:18 2008
@@ -205,6 +205,9 @@
     HADOOP-3019. A new library to support total order partitions.
     (cdouglas via omalley)
 
+    HADOOP-3924. Added a 'KILLED' job status. (Subramaniam Krishnan via
+    acmurthy) 
+
   IMPROVEMENTS
 
     HADOOP-4106. libhdfs: add time, permission and user attribute support (part 2).

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Fri Sep 19 16:45:18 2008
@@ -34,6 +34,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -328,7 +329,8 @@
     List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
     for (JobInProgress job: infos.keySet()) { 
       int runState = job.getStatus().getRunState();
-      if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED) {
+      if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
+          || runState == JobStatus.KILLED) {
         toRemove.add(job);
       }
     }

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Fri Sep 19 16:45:18 2008
@@ -31,6 +31,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
 
 public class TestFairScheduler extends TestCase {
@@ -334,6 +335,7 @@
     submitJobs(1, JobStatus.PREP, 10, 10);
     submitJobs(1, JobStatus.SUCCEEDED, 10, 10);
     submitJobs(1, JobStatus.FAILED, 10, 10);
+    submitJobs(1, JobStatus.KILLED, 10, 10);
     assertNull(scheduler.assignTasks(tracker("tt1")));
     advanceTime(100); // Check that we still don't assign jobs after an update
     assertNull(scheduler.assignTasks(tracker("tt1")));

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
Fri Sep 19 16:45:18 2008
@@ -58,7 +58,7 @@
           LOG.error("Job initialization failed:\n" +
                     StringUtils.stringifyException(t));
           if (job != null) {
-            job.kill();
+            job.fail();
           }
         }
       }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Sep 19 16:45:18
2008
@@ -62,7 +62,6 @@
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -268,7 +267,8 @@
     public synchronized boolean isComplete() throws IOException {
       updateStatus();
       return (status.getRunState() == JobStatus.SUCCEEDED ||
-              status.getRunState() == JobStatus.FAILED);
+              status.getRunState() == JobStatus.FAILED ||
+              status.getRunState() == JobStatus.KILLED);
     }
 
     /**
@@ -292,6 +292,14 @@
     }
 
     /**
+     * Tells the service to get the state of the current job.
+     */
+    public synchronized int getJobState() throws IOException {
+      updateStatus();
+      return status.getRunState();
+    }
+    
+    /**
      * Tells the service to terminate the current job.
      */
     public synchronized void killJob() throws IOException {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Sep 19 16:45:18
2008
@@ -920,6 +920,37 @@
       }
     }
     /**
+     * Logs job killed event. Closes the job history log file.
+     * 
+     * @param jobid
+     *          job id
+     * @param timestamp
+     *          time when job killed was issued in ms.
+     * @param finishedMaps
+     *          no finished map tasks.
+     * @param finishedReduces
+     *          no of finished reduce tasks.
+     */
+    public static void logKilled(JobID jobid, long timestamp, int finishedMaps,
+        int finishedReduces) {
+      if (!disableHistory) {
+        String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
+        ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+
+        if (null != writer) {
+          JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID,
+              Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
+              Keys.FINISHED_REDUCES }, new String[] { jobid.toString(),
+              String.valueOf(timestamp), Values.KILLED.name(),
+              String.valueOf(finishedMaps), String.valueOf(finishedReduces) });
+          for (PrintWriter out : writer) {
+            out.close();
+          }
+          openJobs.remove(logFileKey);
+        }
+      }
+    }
+    /**
      * Log job's priority. 
      * @param jobid job id
      * @param priority Jobs priority 
@@ -936,7 +967,6 @@
         }
       }
     }
-
     /**
      * Log job's submit-time/launch-time 
      * @param jobid job id
@@ -960,6 +990,7 @@
       }
     }
   }
+  
   /**
    * Helper class for logging or reading back events related to Task's start, finish or failure.

    * All events logged by this class are logged in a separate file per job in 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Sep 19 16:45:18
2008
@@ -31,6 +31,7 @@
 import java.util.TreeMap;
 import java.util.Vector;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -83,6 +84,7 @@
   int failedReduceTIPs = 0;
   private volatile boolean launchedCleanup = false;
   private volatile boolean jobKilled = false;
+  private volatile boolean jobFailed = false;
 
   JobPriority priority = JobPriority.NORMAL;
   JobTracker jobtracker = null;
@@ -873,7 +875,7 @@
       return false;
     }
     // check if job has failed or killed
-    if (jobKilled) {
+    if (jobKilled || jobFailed) {
       return true;
     }
     // Check if all maps and reducers have finished.
@@ -1702,10 +1704,13 @@
       }
       //
       // The Job is done
-      //
-      // if the job is killed, then mark the job failed.
+      // if the job is failed, then mark the job failed.
+      if (jobFailed) {
+        terminateJob(JobStatus.FAILED);
+      }
+      // if the job is killed, then mark the job killed.
       if (jobKilled) {
-        killJob();
+        terminateJob(JobStatus.KILLED);
       }
       else {
         jobComplete(metrics);
@@ -1747,23 +1752,35 @@
     }
   }
   
-  private synchronized void killJob() {
+  private synchronized void terminateJob(int jobTerminationState) {
     if ((status.getRunState() == JobStatus.RUNNING) ||
         (status.getRunState() == JobStatus.PREP)) {
-      this.status = new JobStatus(status.getJobID(),
-                          1.0f, 1.0f, 1.0f, JobStatus.FAILED,
-                          status.getJobPriority());
-      this.finishTime = System.currentTimeMillis();
-      JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
-              this.finishedMapTasks, this.finishedReduceTasks);
+      if (jobTerminationState == JobStatus.FAILED) {
+        this.status = new JobStatus(status.getJobID(),
+                                    1.0f, 1.0f, 1.0f, JobStatus.FAILED,
+                                    status.getJobPriority());
+        this.finishTime = System.currentTimeMillis();
+        JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
+                                     this.finishedMapTasks, 
+                                     this.finishedReduceTasks);
+      } else {
+        this.status = new JobStatus(status.getJobID(),
+                                    1.0f, 1.0f, 1.0f, JobStatus.KILLED,
+                                    status.getJobPriority());
+        this.finishTime = System.currentTimeMillis();
+        JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime, 
+                                     this.finishedMapTasks, 
+                                     this.finishedReduceTasks);
+      }
       garbageCollect();
     }
   }
 
   /**
-   * Kill the job and all its component tasks.
+   * Terminate the job and all its component tasks.
+   * @param jobTerminationState job termination state
    */
-  public synchronized void kill() {
+  private synchronized void terminate(int jobTerminationState) {
     if ((status.getRunState() == JobStatus.RUNNING) ||
          (status.getRunState() == JobStatus.PREP)) {
       LOG.info("Killing job '" + this.status.getJobID() + "'");
@@ -1778,11 +1795,29 @@
       for (int i = 0; i < reduces.length; i++) {
         reduces[i].kill();
       }
-      jobKilled = true;
+      if (jobTerminationState == JobStatus.FAILED) {
+        jobFailed = true;
+      } else if (jobTerminationState == JobStatus.KILLED) {
+        jobKilled = true;
+      }
     }
   }
   
   /**
+   * Kill the job and all its component tasks.
+   */
+  public synchronized void kill() {
+    terminate(JobStatus.KILLED);
+  }
+  
+  /**
+   * Fails the job and all its component tasks.
+   */
+  synchronized void fail() {
+    terminate(JobStatus.FAILED);
+  }
+  
+  /**
    * A task assigned to this JobInProgress has reported in as failed.
    * Most of the time, we'll just reschedule execution.  However, after
    * many repeated failures we may instead decide to allow the entire 
@@ -1930,9 +1965,9 @@
           } else {
             cleanup[0].kill();
           }
-          killJob();
+          terminateJob(JobStatus.FAILED);
         } else {
-          kill();
+          fail();
         }
       }
       

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java Fri Sep 19 16:45:18
2008
@@ -46,6 +46,7 @@
   public static final int SUCCEEDED = 2;
   public static final int FAILED = 3;
   public static final int PREP = 4;
+  public static final int KILLED = 5;
 
   private JobID jobid;
   private float mapProgress;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Sep
19 16:45:18 2008
@@ -45,9 +45,10 @@
    *             cleanupProgress to JobStatus as part of HADOOP-3150
    * Version 13: Added getJobQueueInfos and getJobQueueInfo(queue name)
    *             and getAllJobs(queue) as a part of HADOOP-3930
-   * Version 14: Added setPriority for HADOOP-4124            
+   * Version 14: Added setPriority for HADOOP-4124
+   * Version 15: Added KILLED status to JobStatus as part of HADOOP-3924            
    */
-  public static final long versionID = 14L;
+  public static final long versionID = 15L;
 
   /**
    * Allocate a name for the job.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Sep 19 16:45:18
2008
@@ -29,8 +29,8 @@
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -1541,7 +1541,8 @@
               int rjobRunState = 
                 rjob.getStatus().getRunState();
               if (rjobRunState == JobStatus.SUCCEEDED || 
-                  rjobRunState == JobStatus.FAILED) {
+                  rjobRunState == JobStatus.FAILED ||
+                  rjobRunState == JobStatus.KILLED) {
                 // Ok, this call to removeTaskEntries
                 // is dangerous is some very very obscure
                 // cases; e.g. when rjob completed, hit
@@ -1628,7 +1629,8 @@
     for (Iterator it = jobs.values().iterator(); it.hasNext();) {
       JobInProgress jip = (JobInProgress) it.next();
       JobStatus status = jip.getStatus();
-      if (status.getRunState() == JobStatus.FAILED) {
+      if ((status.getRunState() == JobStatus.FAILED)
+          || (status.getRunState() == JobStatus.KILLED)) {
         v.add(jip);
       }
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java Fri Sep 19 16:45:18
2008
@@ -109,7 +109,7 @@
    * @throws IOException
    */
   public boolean isSuccessful() throws IOException;
-
+  
   /**
    * Blocks until the job is complete.
    * 
@@ -118,6 +118,14 @@
   public void waitForCompletion() throws IOException;
 
   /**
+   * Returns the current state of the Job.
+   * {@link JobStatus}
+   * 
+   * @throws IOException
+   */
+  public int getJobState() throws IOException;
+  
+  /**
    * Kill the running job.  Blocks until all job tasks have been
    * killed as well.  If the job is no longer running, it simply returns.
    * 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri
Sep 19 16:45:18 2008
@@ -228,6 +228,7 @@
     submitJobs(1, JobStatus.PREP);
     submitJobs(1, JobStatus.SUCCEEDED);
     submitJobs(1, JobStatus.FAILED);
+    submitJobs(1, JobStatus.KILLED);
     assertNull(scheduler.assignTasks(tracker("tt1")));
   }
   

Modified: hadoop/core/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobdetails.jsp?rev=697286&r1=697285&r2=697286&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/jobdetails.jsp Fri Sep 19 16:45:18 2008
@@ -228,6 +228,13 @@
                   "<br>\n");
         out.print("<b>Failed in:</b> " + StringUtils.formatTimeDiff(
             job.getFinishTime(), job.getStartTime()) + "<br>\n");
+      } else if (runState == JobStatus.KILLED) {
+        out.print("<b>Status:</b> Killed<br>\n");
+        out.print("<b>Started at:</b> " + new Date(job.getStartTime()) + "<br>\n");
+        out.print("<b>Killed at:</b> " + new Date(job.getFinishTime()) +
+                  "<br>\n");
+        out.print("<b>Killed in:</b> " + StringUtils.formatTimeDiff(
+            job.getFinishTime(), job.getStartTime()) + "<br>\n");
       }
     }
     out.print("<b>Job Cleanup:</b>");



Mime
View raw message