hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r555770 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ src/webapps/job/
Date Thu, 12 Jul 2007 21:11:31 GMT
Author: tomwhite
Date: Thu Jul 12 14:11:30 2007
New Revision: 555770

URL: http://svn.apache.org/viewvc?view=rev&rev=555770
Log:
HADOOP-1433.  Add job priority.  Contributed by Johan Oskarsson.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobPriority.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=555770&r1=555769&r2=555770
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jul 12 14:11:30 2007
@@ -346,6 +346,8 @@
 107. HADOOP-1570.  Permit jobs to enable and disable the use of
      hadoop's native library.  (Arun C Murthy via cutting)
 
+108. HADOOP-1433.  Add job priority.  (Johan Oskarsson via tomwhite)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=555770&r1=555769&r2=555770
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Jul 12 14:11:30
2007
@@ -686,6 +686,28 @@
     setInt("mapred.max.reduce.failures.percent", percent);
   }
   
+  /**
+   * Set job priority for this job.
+   * 
+   * @param prio
+   */
+  public void setJobPriority(JobPriority prio) {
+    set("mapred.job.priority", prio.toString());
+  }
+  
+  /**
+   * Get the job priority for this job.
+   */
+  public JobPriority getJobPriority() {
+    String prio = get("mapred.job.priority");
+    if(prio == null) {
+      return JobPriority.NORMAL;
+    }
+    
+    return JobPriority.valueOf(prio);
+  }
+  
+  
   /** Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing
    * on the class path that has a class with the same name.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=555770&r1=555769&r2=555770
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Jul 12 14:11:30
2007
@@ -69,7 +69,8 @@
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
   int failedReduceTIPs = 0;
-  
+
+  JobPriority priority = JobPriority.NORMAL;
   JobTracker jobtracker = null;
   Map<String,List<TaskInProgress>> hostToMaps =
     new HashMap<String,List<TaskInProgress>>();
@@ -133,6 +134,7 @@
     FileSystem fs = FileSystem.get(default_conf);
     fs.copyToLocalFile(new Path(jobFile), localJobFile);
     conf = new JobConf(localJobFile);
+    this.priority = conf.getJobPriority();
     this.profile = new JobProfile(conf.getUser(), fullJobId, jobFile, url,
                                   conf.getJobName());
     String jarFile = conf.getJar();
@@ -298,6 +300,16 @@
   }
   public synchronized int finishedReduces() {
     return finishedReduceTasks;
+  }
+  public JobPriority getPriority() {
+    return this.priority;
+  }
+  public void setPriority(JobPriority priority) {
+    if(priority == null) {
+      this.priority = JobPriority.NORMAL;
+    } else {
+      this.priority = priority;
+    }
   }
  
   /**

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobPriority.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobPriority.java?view=auto&rev=555770
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobPriority.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobPriority.java Thu Jul 12 14:11:30
2007
@@ -0,0 +1,15 @@
+package org.apache.hadoop.mapred;
+
+/**
+ * Used to describe the priority of the running job. 
+ *
+ */
+public enum JobPriority {
+
+  VERY_HIGH,
+  HIGH,
+  NORMAL,
+  LOW,
+  VERY_LOW;
+  
+}
\ No newline at end of file

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=555770&r1=555769&r2=555770
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Jul 12 14:11:30
2007
@@ -24,6 +24,7 @@
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashSet;
@@ -335,8 +336,8 @@
           List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
           long retireBefore = System.currentTimeMillis() - 
             RETIRE_JOB_INTERVAL;
-          synchronized (jobsByArrival) {
-            for(JobInProgress job: jobsByArrival) {
+          synchronized (jobsByPriority) {
+            for(JobInProgress job: jobsByPriority) {
               if (job.getStatus().getRunState() != JobStatus.RUNNING &&
                   job.getStatus().getRunState() != JobStatus.PREP &&
                   (job.getFinishTime()  < retireBefore)) {
@@ -347,13 +348,13 @@
           if (!retiredJobs.isEmpty()) {
             synchronized (JobTracker.this) {
               synchronized (jobs) {
-                synchronized (jobsByArrival) {
+                synchronized (jobsByPriority) {
                   synchronized (jobInitQueue) {
                     for (JobInProgress job: retiredJobs) {
                       removeJobTasks(job);
                       jobs.remove(job.getProfile().getJobId());
                       jobInitQueue.remove(job);
-                      jobsByArrival.remove(job);
+                      jobsByPriority.remove(job);
                       String jobUser = job.getProfile().getUser();
                       synchronized (userToJobsMap) {
                         ArrayList<JobInProgress> userJobs =
@@ -518,7 +519,7 @@
 
   // All the known jobs.  (jobid->JobInProgress)
   Map<String, JobInProgress> jobs = new TreeMap<String, JobInProgress>();
-  List<JobInProgress> jobsByArrival = new ArrayList<JobInProgress>();
+  List<JobInProgress> jobsByPriority = new ArrayList<JobInProgress>();
 
   // (user -> list of JobInProgress)
   TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
@@ -904,7 +905,7 @@
     // in memory; information about the purged jobs is available via
     // JobHistory.
     synchronized (jobs) {
-      synchronized (jobsByArrival) {
+      synchronized (jobsByPriority) {
         synchronized (jobInitQueue) {
           synchronized (userToJobsMap) {
             String jobUser = job.getProfile().getUser();
@@ -945,7 +946,7 @@
                   userJobs.remove(0);
                   jobs.remove(rjob.getProfile().getJobId());
                   jobInitQueue.remove(rjob);
-                  jobsByArrival.remove(rjob);
+                  jobsByPriority.remove(rjob);
                     
                   LOG.info("Retired job with id: '" + 
                            rjob.getProfile().getJobId() + "' of user: '" +
@@ -1261,8 +1262,8 @@
     }
     int totalCapacity = numTaskTrackers * maxCurrentTasks;
 
-    synchronized(jobsByArrival){
-      for (Iterator it = jobsByArrival.iterator(); it.hasNext();) {
+    synchronized(jobsByPriority){
+      for (Iterator it = jobsByPriority.iterator(); it.hasNext();) {
         JobInProgress job = (JobInProgress) it.next();
         if (job.getStatus().getRunState() == JobStatus.RUNNING) {
           int totalMapTasks = job.desiredMaps();
@@ -1306,11 +1307,11 @@
     // task.
     //
        
-    synchronized (jobsByArrival) {
+    synchronized (jobsByPriority) {
       if (numMaps < maxMapLoad) {
 
         int totalNeededMaps = 0;
-        for (Iterator it = jobsByArrival.iterator(); it.hasNext();) {
+        for (Iterator it = jobsByPriority.iterator(); it.hasNext();) {
           JobInProgress job = (JobInProgress) it.next();
           if (job.getStatus().getRunState() != JobStatus.RUNNING) {
             continue;
@@ -1346,7 +1347,7 @@
       if (numReduces < maxReduceLoad) {
 
         int totalNeededReduces = 0;
-        for (Iterator it = jobsByArrival.iterator(); it.hasNext();) {
+        for (Iterator it = jobsByPriority.iterator(); it.hasNext();) {
           JobInProgress job = (JobInProgress) it.next();
           if (job.getStatus().getRunState() != JobStatus.RUNNING ||
               job.numReduceTasks == 0) {
@@ -1453,17 +1454,44 @@
     totalSubmissions++;
     JobInProgress job = new JobInProgress(jobFile, this, this.conf);
     synchronized (jobs) {
-      synchronized (jobsByArrival) {
+      synchronized (jobsByPriority) {
         synchronized (jobInitQueue) {
           jobs.put(job.getProfile().getJobId(), job);
-          jobsByArrival.add(job);
+          jobsByPriority.add(job);
           jobInitQueue.add(job);
+          resortPriority();
           jobInitQueue.notifyAll();
         }
       }
     }
     myMetrics.submitJob();
     return job.getStatus();
+  }
+
+  /**
+   * Sort jobs by priority and then by start time.
+   */
+  public void resortPriority() {
+    Comparator<JobInProgress> comp = new Comparator<JobInProgress>() {
+      public int compare(JobInProgress o1, JobInProgress o2) {
+        int res = o1.getPriority().compareTo(o2.getPriority());
+        if(res == 0) {
+          if(o1.getStartTime() < o2.getStartTime())
+            res = -1;
+          else
+            res = (o1.getStartTime()==o2.getStartTime() ? 0 : 1);
+        }
+          
+        return res;
+      }
+    };
+    
+    synchronized(jobsByPriority) {
+      Collections.sort(jobsByPriority, comp);
+    }
+    synchronized (jobInitQueue) {
+      Collections.sort(jobInitQueue, comp);
+    }
   }
 
   public synchronized ClusterStatus getClusterStatus() {

Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?view=diff&rev=555770&r1=555769&r2=555770
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Thu Jul 12 14:11:30 2007
@@ -102,8 +102,14 @@
     
     JobInProgress job = (JobInProgress) tracker.getJob(jobId);
     
+    String action = request.getParameter("action");
+    if("changeprio".equalsIgnoreCase(action)) {
+	  job.setPriority(JobPriority.valueOf(request.getParameter("prio")));
+	  tracker.resortPriority();
+    }
+    
     if(JspHelper.conf.getBoolean(PRIVATE_ACTIONS_KEY, false)) {
-    	String action = request.getParameter("action");
+        action = request.getParameter("action");
 	    if(action!=null && action.equalsIgnoreCase("confirm")) {
   	      printConfirm(out, jobId);
     	    return;
@@ -234,9 +240,20 @@
     </table>
 
 
+<hr>Change priority from <%=job.getPriority()%> to: 
+<%
+  JobPriority jobPrio = job.getPriority();
+  for (JobPriority prio : JobPriority.values()) {
+    if(jobPrio != prio) {
+      %><a href="jobdetails.jsp?action=changeprio&jobid=<%=jobId%>&prio=<%=prio%>">
<%=prio%> </a><%
+    }
+  }
+%>
+</br>
+    
 <% if(JspHelper.conf.getBoolean(PRIVATE_ACTIONS_KEY, false) 
     	&& runState == JobStatus.RUNNING) { %>
-	<hr><a href="jobdetails.jsp?action=confirm&jobid=<%=jobId%>"> Kill
this job </a>
+	<br/><a href="jobdetails.jsp?action=confirm&jobid=<%=jobId%>"> Kill
this job </a>
 <% } %>
 <hr>
 <a href="jobtracker.jsp">Go back to JobTracker</a><br>



Mime
View raw message