hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r804284 [4/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/fairscheduler/designdoc/ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/docs/src/documentation/content...
Date Fri, 14 Aug 2009 16:32:05 GMT
Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml Fri
Aug 14 16:32:04 2009
@@ -48,13 +48,11 @@
       <p>
         The fair scheduler organizes jobs into <em>pools</em>, and 
         divides resources fairly between these pools. By default, there is a 
-        separate pool for each user, so that each user gets the same share 
-        of the cluster no matter how many jobs they submit. It is also
-        possible to set a job's pool based on the user's Unix group or
-        any jobconf property. 
-        Within each pool, fair sharing is used to divide capacity between 
-        the running jobs. Pools can also be given weights to share the 
-        cluster non-proportionally.
+        separate pool for each user, so that each user gets an equal share 
+        of the cluster. It is also possible to set a job's pool based on the
+        user's Unix group or any jobconf property. 
+        Within each pool, jobs can be scheduled using either fair sharing or 
+        first-in-first-out (FIFO) scheduling.
       </p>
       <p>
         In addition to providing fair sharing, the Fair Scheduler allows
@@ -65,19 +63,15 @@
         guaranteed share, the excess is split between other pools.
       </p>
       <p>
-        In normal operation, when a new job is submitted, the scheduler 
-        waits for tasks from existing jobs to finish in order to free up
-        slots for the new job. However, the scheduler also optionally supports
-        <em>preemption</em> of running jobs after configurable timeouts.
-        If the new job's minimum share is not reached after
-        a certain amount of time, the job is allowed to kill tasks from
-        existing jobs to make room to run.
-        Preemption can thus be used to guarantee
-        that "production" jobs run at specified times while allowing
+        If a pool's minimum share is not met for some period of time, the
+        scheduler optionally supports <em>preemption</em> of jobs in other
+        pools. The pool will be allowed to kill tasks from other pools to make
+        room to run. Preemption can be used to guarantee
+        that "production" jobs are not starved while also allowing
         the Hadoop cluster to also be used for experimental and research jobs.
-        In addition, a job can also be allowed to preempt tasks if it is
+        In addition, a pool can also be allowed to preempt tasks if it is
         below half of its fair share for a configurable timeout (generally
-        set larger than the minimum share timeout).
+        set larger than the minimum share preemption timeout).
         When choosing tasks to kill, the fair scheduler picks the
         most-recently-launched tasks from over-allocated jobs, 
         to minimize wasted computation.
@@ -87,11 +81,11 @@
       <p>
         Finally, the Fair Scheduler can limit the number of concurrent
         running jobs per user and per pool. This can be useful when a 
-        user must submit hundreds of jobs at once, and for ensuring that
-        intermediate data does not fill up disk space on a cluster if too many
+        user must submit hundreds of jobs at once, or for ensuring that
+        intermediate data does not fill up disk space on a cluster when too many
         concurrent jobs are running.
-        Setting job limits causes jobs submitted beyond the limit to wait in the
-        scheduler's queue until some of the user/pool's earlier jobs finish.
+        Setting job limits causes jobs submitted beyond the limit to wait
+        until some of the user/pool's earlier jobs finish.
         Jobs to run from each user/pool are chosen in order of priority and then
         submit time.
       </p>
@@ -356,6 +350,9 @@
           <ul>
           <li><em>minMaps</em> and <em>minReduces</em>,
             to set the pool's minimum share of task slots.</li>
+          <li><em>schedulingMode</em>, the pool's internal scheduling mode,
+          which can be <em>fair</em> for fair sharing or <em>fifo</em>
for
+          first-in-first-out.</li>
           <li><em>maxRunningJobs</em>, 
           to limit the number of jobs from the 
           pool to run at once (defaults to infinite).</li>
@@ -381,6 +378,9 @@
         <li><em>fairSharePreemptionTimeout</em>, 
         which sets the preemption timeout used when jobs are below half
         their fair share.</li>
+        <li><em>defaultPoolSchedulingMode</em>, which sets the default
scheduling 
+        mode (<em>fair</em> or <em>fifo</em>) for pools whose mode
is
+        not specified.</li>
         </ul>
         <p>
         Pool and user elements only required if you are setting
@@ -436,9 +436,9 @@
     </p> 
     <ol>
     <li>
-      It is possible to modify minimum shares, limits, weights and preemption
-      timeouts at runtime by editing the allocation file.
-      The scheduler will reload this file 10-15 seconds after it 
+      It is possible to modify minimum shares, limits, weights, preemption
+      timeouts and pool scheduling modes at runtime by editing the allocation
+      file. The scheduler will reload this file 10-15 seconds after it 
       sees that it was modified.
      </li>
      <li>
@@ -471,22 +471,16 @@
      <p>
      In addition, it is possible to view an "advanced" version of the web 
      UI by going to <em>http://&lt;JobTracker URL&gt;/scheduler?advanced</em>.

-     This view shows four more columns:
+     This view shows two more columns:
      </p>
      <ul>
      <li><em>Maps/Reduce Weight</em>: Weight of the job in the fair sharing

      calculations. This depends on priority and potentially also on 
      job size and job age if the <em>sizebasedweight</em> and 
      <em>NewJobWeightBooster</em> are enabled.</li>
-     <li><em>Map/Reduce Deficit</em>: The job's scheduling deficit in machine-
-     seconds - the amount of resources it should have gotten according to 
-     its fair share, minus how many it actually got. Positive deficit means
-      the job will be scheduled again in the near future because it needs to 
-      catch up to its fair share. The scheduler schedules jobs with higher 
-      deficit ahead of others. Please see the Implementation section of 
-      this document for details.</li>
      </ul>
     </section>
+    <!--
     <section>
     <title>Implementation</title>
     <p>There are two aspects to implementing fair scheduling: Calculating 
@@ -541,5 +535,6 @@
      <a href="#Advanced+Parameters">advanced mapred-site.xml properties</a>.
      </p>
     </section>
+    -->
   </body>  
 </document>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Aug 14
16:32:04 2009
@@ -1167,7 +1167,8 @@
    */
   public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, 
                                             int clusterSize, 
-                                            int numUniqueHosts
+                                            int numUniqueHosts,
+                                            int maxCacheLevel
                                            ) throws IOException {
     if (status.getRunState() != JobStatus.RUNNING) {
       LOG.info("Cannot create task split for " + profile.getJobID());
@@ -1175,7 +1176,7 @@
     }
        
     int target = findNewMapTask(tts, clusterSize, numUniqueHosts,
-        anyCacheLevel);
+        maxCacheLevel);
     if (target == -1) {
       return null;
     }
@@ -1186,6 +1187,16 @@
     }
 
     return result;
+  } 
+  
+  /**
+   * Return a MapTask, if appropriate, to run on the given tasktracker
+   */
+  public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, 
+                                            int clusterSize, 
+                                            int numUniqueHosts
+                                           ) throws IOException {
+    return obtainNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel);
   }    
 
   /*
@@ -1234,18 +1245,8 @@
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
-
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel);
-    if (target == -1) {
-      return null;
-    }
-
-    Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
-    }
-
-    return result;
+  
+    return obtainNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel);
   }
   
   public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts,
@@ -1256,19 +1257,9 @@
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
-
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 
-                                NON_LOCAL_CACHE_LEVEL);
-    if (target == -1) {
-      return null;
-    }
-
-    Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
-    }
-
-    return result;
+  
+    return obtainNewMapTask(tts, clusterSize, numUniqueHosts,
+        NON_LOCAL_CACHE_LEVEL);
   }
   
   /**
@@ -1539,23 +1530,7 @@
     // data locality.
     if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask())
{
       // increment the data locality counter for maps
-      Node tracker = jobtracker.getNode(tts.getHost());
-      int level = this.maxLevel;
-      // find the right level across split locations
-      for (String local : maps[tip.getIdWithinJob()].getSplitLocations()) {
-        Node datanode = jobtracker.getNode(local);
-        int newLevel = this.maxLevel;
-        if (tracker != null && datanode != null) {
-          newLevel = getMatchingLevelForNodes(tracker, datanode);
-        }
-        if (newLevel < level) {
-          level = newLevel;
-          // an optimization
-          if (level == 0) {
-            break;
-          }
-        }
-      }
+      int level = getLocalityLevel(tip, tts);
       switch (level) {
       case 0 :
         LOG.info("Choosing data-local task " + tip.getTIPId());
@@ -3270,6 +3245,33 @@
   }
   
   /**
+   * Get the level of locality that a given task would have if launched on
+   * a particular TaskTracker. Returns 0 if the task has data on that machine,
+   * 1 if it has data on the same rack, etc (depending on number of levels in
+   * the network hierarchy).
+   */
+  int getLocalityLevel(TaskInProgress tip, TaskTrackerStatus tts) {
+    Node tracker = jobtracker.getNode(tts.getHost());
+    int level = this.maxLevel;
+    // find the right level across split locations
+    for (String local : maps[tip.getIdWithinJob()].getSplitLocations()) {
+      Node datanode = jobtracker.getNode(local);
+      int newLevel = this.maxLevel;
+      if (tracker != null && datanode != null) {
+        newLevel = getMatchingLevelForNodes(tracker, datanode);
+      }
+      if (newLevel < level) {
+        level = newLevel;
+        // an optimization
+        if (level == 0) {
+          break;
+        }
+      }
+    }
+    return level;
+  }
+  
+  /**
    * Test method to set the cluster sizes
    */
   void setClusterSize(int clusterSize) {



Mime
View raw message