hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1169585 [5/5] - in /hadoop/common/branches/branch-0.20-security: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/ src/co...
Date Sun, 11 Sep 2011 23:57:38 GMT
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerSystem.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerSystem.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerSystem.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapreduce.TestSleepJob;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.conf.Configuration;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.net.HttpURLConnection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import static org.junit.Assert.*;
+
+/**
+ * System tests for the fair scheduler. These run slower than the
+ * mock-based tests in TestFairScheduler but have a better chance
+ * of catching synchronization bugs with the real JT.
+ *
+ * This test suite will often be run inside JCarder in order to catch
+ * deadlock bugs which have plagued the scheduler in the past - hence
+ * it is a bit of a "grab-bag" of system tests, since it's important
+ * that they all run as part of the same JVM instantiation.
+ */
+public class TestFairSchedulerSystem {
+  static final int NUM_THREADS=2;
+
+  static MiniMRCluster mr;
+  static JobConf conf;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf = new JobConf();
+    final int taskTrackers = 1;
+
+    // Bump up the frequency of preemption updates to test against
+    // deadlocks, etc.
+    conf.set("mapred.jobtracker.taskScheduler", FairScheduler.class.getCanonicalName());
+    conf.set("mapred.fairscheduler.update.interval", "1");
+    conf.set("mapred.fairscheduler.preemption.interval", "1");
+    conf.set("mapred.fairscheduler.preemption", "true");
+    conf.set("mapred.fairscheduler.eventlog.enabled", "true");
+    conf.set("mapred.fairscheduler.poolnameproperty", "group.name");
+    conf.set("mapred.job.tracker.persist.jobstatus.active", "false");
+    mr = new MiniMRCluster(taskTrackers, "file:///", 1, null, null, conf);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (mr != null) {
+      mr.shutdown();
+    }
+  }
+
+  private void runSleepJob(JobConf conf) throws Exception {
+    String[] args = { "-m", "1", "-r", "1", "-mt", "1", "-rt", "1" };
+    ToolRunner.run(conf, new TestSleepJob(), args);
+  }
+
+  /**
+   * Submit some concurrent sleep jobs, and visit the scheduler servlet
+   * while they're running.
+   */
+  @Test
+  public void testFairSchedulerSystem() throws Exception {
+    ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
+    List<Future<Void>> futures = new ArrayList<Future<Void>>(NUM_THREADS);
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futures.add(exec.submit(new Callable<Void>() {
+            public Void call() throws Exception {
+              JobConf jobConf = mr.createJobConf();
+              runSleepJob(jobConf);
+              return null;
+            }
+          }));
+    }
+
+    JobClient jc = new JobClient(mr.createJobConf(null));
+
+    // Wait for the tasks to finish, and visit the scheduler servlet
+    // every few seconds while waiting.
+    for (Future<Void> future : futures) {
+      while (true) {
+        try {
+          future.get(3, TimeUnit.SECONDS);
+          break;
+        } catch (TimeoutException te) {
+          // It's OK
+        }
+        checkServlet(true);
+        checkServlet(false);
+
+        JobStatus jobs[] = jc.getAllJobs();
+        if (jobs == null) {
+          System.err.println("No jobs running, not checking tasklog servlet");
+          continue;
+        }
+        for (JobStatus j : jobs) {
+          System.err.println("Checking task graph for " + j.getJobID());
+          try {
+            checkTaskGraphServlet(j.getJobID());
+          } catch (AssertionError err) {
+            // The task graph servlet will be empty if the job has retired.
+            // This is OK.
+            RunningJob rj = jc.getJob(j.getJobID());
+            if (!rj.isComplete()) {
+              throw err;
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Check the fair scheduler servlet for good status code and smoke test
+   * for contents.
+   */
+  private void checkServlet(boolean advanced) throws Exception {
+    String jtURL = "http://localhost:" +
+      mr.getJobTrackerRunner().getJobTrackerInfoPort();
+    URL url = new URL(jtURL + "/scheduler" +
+                      (advanced ? "?advanced" : ""));
+    HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+    connection.setRequestMethod("GET");
+    connection.connect();
+    assertEquals(200, connection.getResponseCode());
+
+    // Just to be sure, slurp the content and make sure it looks like the scheduler
+    BufferedReader reader = new BufferedReader(
+      new InputStreamReader(connection.getInputStream()));
+    StringBuilder sb = new StringBuilder();
+
+    String line = null;
+    while ((line = reader.readLine()) != null) {
+      sb.append(line).append('\n');
+    }
+
+    String contents = sb.toString();
+    assertTrue("Bad contents for fair scheduler servlet: " + contents,
+      contents.contains("Fair Scheduler Administration"));
+  }
+
+  private void checkTaskGraphServlet(JobID job) throws Exception {
+    String jtURL = "http://localhost:" +
+      mr.getJobTrackerRunner().getJobTrackerInfoPort();
+    URL url = new URL(jtURL + "/taskgraph?jobid=" + job.toString() + "&type=map");
+    HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+    connection.setRequestMethod("GET");
+    connection.connect();
+    assertEquals(200, connection.getResponseCode());
+
+    // Just to be sure, slurp the content and make sure it looks like the scheduler
+    String contents = slurpContents(connection);
+    assertTrue("Bad contents for job " + job + ":\n" + contents,
+      contents.contains("</svg>"));
+  }
+
+  private String slurpContents(HttpURLConnection connection) throws Exception {
+    BufferedReader reader = new BufferedReader(
+      new InputStreamReader(connection.getInputStream()));
+    StringBuilder sb = new StringBuilder();
+
+    String line = null;
+    while ((line = reader.readLine()) != null) {
+      sb.append(line).append('\n');
+    }
+
+    return sb.toString();
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml (original)
+++ hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml Sun Sep 11 23:57:37 2011
@@ -18,16 +18,15 @@
 <!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
 <document>
   <header>
-    <title>Fair Scheduler Guide</title>
+    <title>Fair Scheduler</title>
   </header>
   <body>
 
     <section>
       <title>Purpose</title>
 
-      <p>This document describes the Fair Scheduler, a pluggable
-        MapReduce scheduler for Hadoop which provides a way to share
-        large clusters.</p>
+      <p>This document describes the Fair Scheduler, a pluggable MapReduce
+        scheduler that provides a way to share large clusters.</p>
     </section>
 
     <section>
@@ -39,52 +38,62 @@
         free up are assigned to the new jobs, so that each job gets
         roughly the same amount of CPU time. Unlike the default Hadoop
         scheduler, which forms a queue of jobs, this lets short jobs finish
-        in reasonable time while not starving long jobs. It is also a 
-        reasonable way to share a cluster between a number of users. Finally, 
-        fair sharing can also work with job priorities - the priorities are
+        in reasonable time while not starving long jobs. It is also an easy
+        way to share a cluster between multiple of users.
+        Fair sharing can also work with job priorities - the priorities are
         used as weights to determine the fraction of total compute time that
-        each job should get.
+        each job gets.
       </p>
       <p>
-        The scheduler actually organizes jobs further into "pools", and 
-        shares resources fairly between these pools. By default, there is a 
-        separate pool for each user, so that each user gets the same share 
-        of the cluster no matter how many jobs they submit. However, it is 
-        also possible to set a job's pool based on the user's Unix group or
-        any other jobconf property, such as the queue name property used by 
-        <a href="capacity_scheduler.html">Capacity Scheduler</a>. 
-        Within each pool, fair sharing is used to share capacity between 
-        the running jobs. Pools can also be given weights to share the 
-        cluster non-proportionally in the config file.
+        The fair scheduler organizes jobs into <em>pools</em>, and 
+        divides resources fairly between these pools. By default, there is a 
+        separate pool for each user, so that each user gets an equal share 
+        of the cluster. It is also possible to set a job's pool based on the
+        user's Unix group or any jobconf property. 
+        Within each pool, jobs can be scheduled using either fair sharing or 
+        first-in-first-out (FIFO) scheduling.
       </p>
       <p>
         In addition to providing fair sharing, the Fair Scheduler allows
-        assigning guaranteed minimum shares to pools, which is useful for
-        ensuring that certain users, groups or production applications
+        assigning guaranteed <em>minimum shares</em> to pools, which is useful
+        for ensuring that certain users, groups or production applications
         always get sufficient resources. When a pool contains jobs, it gets
         at least its minimum share, but when the pool does not need its full
-        guaranteed share, the excess is split between other running jobs.
-        This lets the scheduler guarantee capacity for pools while utilizing
-        resources efficiently when these pools don't contain jobs.       
-      </p>
-      <p>
-        The Fair Scheduler lets all jobs run by default, but it is also
-        possible to limit the number of running jobs per user and per pool
-        through the config file. This can be useful when a user must submit
-        hundreds of jobs at once, or in general to improve performance if
-        running too many jobs at once would cause too much intermediate data
-        to be created or too much context-switching. Limiting the jobs does
-        not cause any subsequently submitted jobs to fail, only to wait in the
-        sheduler's queue until some of the user's earlier jobs finish. Jobs to
-        run from each user/pool are chosen in order of priority and then
-        submit time, as in the default FIFO scheduler in Hadoop.
-      </p>
-      <p>
-        Finally, the fair scheduler provides several extension points where
-        the basic functionality can be extended. For example, the weight
-        calculation can be modified to give a priority boost to new jobs,
-        implementing a "shortest job first" policy which reduces response
-        times for interactive jobs even further.
+        guaranteed share, the excess is split between other pools.
+      </p>
+      <p>
+        If a pool's minimum share is not met for some period of time, the
+        scheduler optionally supports <em>preemption</em> of jobs in other
+        pools. The pool will be allowed to kill tasks from other pools to make
+        room to run. Preemption can be used to guarantee
+        that "production" jobs are not starved while also allowing
+        the Hadoop cluster to also be used for experimental and research jobs.
+        In addition, a pool can also be allowed to preempt tasks if it is
+        below half of its fair share for a configurable timeout (generally
+        set larger than the minimum share preemption timeout).
+        When choosing tasks to kill, the fair scheduler picks the
+        most-recently-launched tasks from over-allocated jobs, 
+        to minimize wasted computation.
+        Preemption does not cause the preempted jobs to fail, because Hadoop
+        jobs tolerate losing tasks; it only makes them take longer to finish.
+      </p>
+      <p>
+        The Fair Scheduler can limit the number of concurrent
+        running jobs per user and per pool. This can be useful when a 
+        user must submit hundreds of jobs at once, or for ensuring that
+        intermediate data does not fill up disk space on a cluster when too many
+        concurrent jobs are running.
+        Setting job limits causes jobs submitted beyond the limit to wait
+        until some of the user/pool's earlier jobs finish.
+        Jobs to run from each user/pool are chosen in order of priority and then
+        submit time.
+      </p>
+      <p>
+        Finally, the Fair Scheduler can limit the number of concurrent
+        running tasks per pool. This can be useful when jobs have a
+        dependency on an external service like a database or web
+        service that could be overloaded if too many map or reduce
+        tasks are run at once.
       </p>
     </section>
 
@@ -93,184 +102,367 @@
       <p>
         To run the fair scheduler in your Hadoop installation, you need to put
         it on the CLASSPATH. The easiest way is to copy the 
-        <em>hadoop-fairscheduler-*.jar</em> from
-        <em>HADOOP_HOME/contrib/fairscheduler</em> to <em>HADOOP_HOME/lib</em>.
+        <em>hadoop-*-fairscheduler.jar</em> from
+        <em>HADOOP_HOME/build/contrib/fairscheduler</em> to <em>HADOOP_HOME/lib</em>.
         Alternatively you can modify <em>HADOOP_CLASSPATH</em> to include this jar, in
         <em>HADOOP_CONF_DIR/hadoop-env.sh</em>
       </p>
       <p>
-        In order to compile fair scheduler, from sources execute <em> ant 
-        package</em> in source folder and copy the 
-        <em>build/contrib/fair-scheduler/hadoop-fairscheduler-*.jar</em> 
-        to <em>HADOOP_HOME/lib</em>
-      </p>
-      <p>
        You will also need to set the following property in the Hadoop config 
        file  <em>HADOOP_CONF_DIR/mapred-site.xml</em> to have Hadoop use 
-       the fair scheduler: <br/>
-       <code>&lt;property&gt;</code><br/> 
-       <code>&nbsp;&nbsp;&lt;name&gt;mapred.jobtracker.taskScheduler&lt;/name&gt;</code><br/>
-       <code>&nbsp;&nbsp;&lt;value&gt;org.apache.hadoop.mapred.FairScheduler&lt;/value&gt;</code><br/>
-       <code>&lt;/property&gt;</code>
+       the fair scheduler:
       </p>
+<source>
+&lt;property&gt;
+  &lt;name&gt;mapred.jobtracker.taskScheduler&lt;/name&gt;
+  &lt;value&gt;org.apache.hadoop.mapred.FairScheduler&lt;/value&gt;
+&lt;/property&gt;
+</source>
       <p>
         Once you restart the cluster, you can check that the fair scheduler 
-        is running by going to http://&lt;jobtracker URL&gt;/scheduler 
+        is running by going to <em>http://&lt;jobtracker URL&gt;/scheduler</em> 
         on the JobTracker's web UI. A &quot;job scheduler administration&quot; page should 
         be visible there. This page is described in the Administration section.
       </p>
+      <p>
+        If you wish to compile the fair scheduler from source, run <em> ant 
+        package</em> in your HADOOP_HOME directory. This will build
+        <em>build/contrib/fair-scheduler/hadoop-*-fairscheduler.jar</em>.
+      </p>
     </section>
     
     <section>
-      <title>Configuring the Fair scheduler</title>
+      <title>Configuration</title>
       <p>
-      The following properties can be set in mapred-site.xml to configure 
-      the fair scheduler:
+        The Fair Scheduler contains configuration in two places -- algorithm
+        parameters are set in <em>HADOOP_CONF_DIR/mapred-site.xml</em>, while 
+        a separate XML file called the <em>allocation file</em>, 
+        located by default in
+        <em>HADOOP_CONF_DIR/fair-scheduler.xml</em>, is used to configure
+        pools, minimum shares, running job limits and preemption timeouts.
+        The allocation file is reloaded periodically at runtime, 
+        allowing you to change pool settings without restarting 
+        your Hadoop cluster.
       </p>
-      <table>
-        <tr>
-        <th>Name</th><th>Description</th>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.allocation.file
-        </td>
-        <td>
-          Specifies an absolute path to an XML file which contains the 
-          allocations for each pool, as well as the per-pool and per-user 
-          limits on number of running jobs. If this property is not 
-          provided, allocations are not used.<br/>
-          This file must be in XML format, and can contain three types of 
-          elements:
+      <p>
+        For a minimal installation, to just get equal sharing between users,
+        you will not need to edit the allocation file.
+      </p>
+      <section>
+      <title>Scheduler Parameters in mapred-site.xml</title>
+        <p>
+          The following parameters can be set in <em>mapred-site.xml</em>
+          to affect the behavior of the fair scheduler:
+        </p>
+        <p><strong>Basic Parameters</strong></p>
+        <table>
+          <tr>
+          <th>Name</th><th>Description</th>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.preemption
+          </td>
+          <td>
+            Boolean property for enabling preemption. Default: false.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.pool
+          </td>
+          <td>
+            Specify the pool that a job belongs in.  
+            If this is specified then mapred.fairscheduler.poolnameproperty is ignored.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.poolnameproperty
+          </td>
+          <td>
+            Specify which jobconf property is used to determine the pool that a
+            job belongs in. String, default: <em>user.name</em>
+            (i.e. one pool for each user). 
+            Another useful value is <em>mapred.job.queue.name</em> to use MapReduce's "queue"
+            system for access control lists (see below).
+            mapred.fairscheduler.poolnameproperty is used only for jobs in which 
+            mapred.fairscheduler.pool is not explicitly set.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.allocation.file
+          </td>
+          <td>
+            Can be used to have the scheduler use a different allocation file
+            than the default one (<em>HADOOP_CONF_DIR/fair-scheduler.xml</em>).
+            Must be an absolute path to the allocation file.
+          </td>
+          </tr>
+        </table>
+        <p> <br></br></p>
+        <p><strong>Advanced Parameters</strong> </p>
+        <table>
+          <tr>
+          <th>Name</th><th>Description</th>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.sizebasedweight
+          </td>
+          <td>
+            Take into account job sizes in calculating their weights for fair 
+            sharing. By default, weights are only based on job priorities. 
+            Setting this flag to true will make them based on the size of the 
+            job (number of tasks needed) as well,though not linearly 
+            (the weight will be proportional to the log of the number of tasks 
+            needed). This lets larger jobs get larger fair shares while still 
+            providing enough of a share to small jobs to let them finish fast. 
+            Boolean value, default: false.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.preemption.only.log
+          </td>
+          <td>
+            This flag will cause the scheduler to run through the preemption
+            calculations but simply log when it wishes to preempt a task,
+            without actually preempting the task. 
+            Boolean property, default: false.
+            This property can be useful for
+            doing a "dry run" of preemption before enabling it to make sure
+            that you have not set timeouts too aggressively.
+            You will see preemption log messages in your JobTracker's output
+            log (<em>HADOOP_LOG_DIR/hadoop-jobtracker-*.log</em>).
+            The messages look as follows:<br/>
+            <code>Should preempt 2 tasks for job_20090101337_0001: tasksDueToMinShare = 2, tasksDueToFairShare = 0</code>
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.update.interval
+          </td>
+          <td>
+            Interval at which to update fair share calculations. The default
+            of 500ms works well for clusters with fewer than 500 nodes, 
+            but larger values reduce load on the JobTracker for larger clusters.
+            Integer value in milliseconds, default: 500.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.preemption.interval
+          </td>
+          <td>
+            Interval at which to check for tasks to preempt. The default
+            of 15s works well for timeouts on the order of minutes.
+            It is not recommended to set timeouts much smaller than this
+            amount, but you can use this value to make preemption computations
+            run more often if you do set such timeouts. A value of less than
+            5s will probably be too small, however, as it becomes less than
+            the inter-heartbeat interval.
+            Integer value in milliseconds, default: 15000.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.weightadjuster
+          </td>
+          <td>
+          An extension point that lets you specify a class to adjust the 
+          weights of running jobs. This class should implement the 
+          <em>WeightAdjuster</em> interface. There is currently one example 
+          implementation - <em>NewJobWeightBooster</em>, which increases the 
+          weight of jobs for the first 5 minutes of their lifetime to let 
+          short jobs finish faster. To use it, set the weightadjuster 
+          property to the full class name, 
+          <code>org.apache.hadoop.mapred.NewJobWeightBooster</code>.
+          NewJobWeightBooster itself provides two parameters for setting the 
+          duration and boost factor.
           <ul>
-          <li>pool elements, which may contain elements for minMaps, 
-          minReduces, maxRunningJobs (limit the number of jobs from the 
-          pool to run at once),and weight (to share the cluster 
-          non-proportionally with other pools).
-          </li>
-          <li>user elements, which may contain a maxRunningJobs to limit 
-          jobs. Note that by default, there is a separate pool for each 
-          user, so these may not be necessary; they are useful, however, 
-          if you create a pool per user group or manually assign jobs 
-          to pools.</li>
-          <li>A userMaxJobsDefault element, which sets the default running 
-          job limit for any users whose limit is not specified.</li>
+          <li><em>mapred.newjobweightbooster.factor</em>
+            Factor by which new jobs weight should be boosted. 
+            Default is 3.</li>
+          <li><em>mapred.newjobweightbooster.duration</em>
+            Boost duration in milliseconds. Default is 300000 for 5 minutes.</li>
           </ul>
-          <br/>
-          Example Allocation file is listed below :<br/>
-          <code>&lt;?xml version="1.0"?&gt; </code> <br/>
-          <code>&lt;allocations&gt;</code> <br/> 
-          <code>&nbsp;&nbsp;&lt;pool name="sample_pool"&gt;</code><br/>
-          <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;minMaps&gt;5&lt;/minMaps&gt;</code><br/>
-          <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;minReduces&gt;5&lt;/minReduces&gt;</code><br/>
-          <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;weight&gt;2.0&lt;/weight&gt;</code><br/>
-          <code>&nbsp;&nbsp;&lt;/pool&gt;</code><br/>
-          <code>&nbsp;&nbsp;&lt;user name="sample_user"&gt;</code><br/>
-          <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;maxRunningJobs&gt;6&lt;/maxRunningJobs&gt;</code><br/>
-          <code>&nbsp;&nbsp;&lt;/user&gt;</code><br/>
-          <code>&nbsp;&nbsp;&lt;userMaxJobsDefault&gt;3&lt;/userMaxJobsDefault&gt;</code><br/>
-          <code>&lt;/allocations&gt;</code>
-          <br/>
-          This example creates a pool sample_pool with a guarantee of 5 map 
-          slots and 5 reduce slots. The pool also has a weight of 2.0, meaning 
-          it has a 2x higher share of the cluster than other pools (the default 
-          weight is 1). Finally, the example limits the number of running jobs 
-          per user to 3, except for sample_user, who can run 6 jobs concurrently. 
-          Any pool not defined in the allocations file will have no guaranteed 
-          capacity and a weight of 1.0. Also, any pool or user with no max 
-          running jobs set in the file will be allowed to run an unlimited 
-          number of jobs.
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.assignmultiple
-        </td>
-        <td>
-          Allows the scheduler to assign both a map task and a reduce task 
-          on each heartbeat, which improves cluster throughput when there 
-          are many small tasks to run. Boolean value, default: false.
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.sizebasedweight
-        </td>
-        <td>
-          Take into account job sizes in calculating their weights for fair 
-          sharing.By default, weights are only based on job priorities. 
-          Setting this flag to true will make them based on the size of the 
-          job (number of tasks needed) as well,though not linearly 
-          (the weight will be proportional to the log of the number of tasks 
-          needed). This lets larger jobs get larger fair shares while still 
-          providing enough of a share to small jobs to let them finish fast. 
-          Boolean value, default: false.
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.poolnameproperty
-        </td>
-        <td>
-          Specify which jobconf property is used to determine the pool that a
-          job belongs in. String, default: user.name (i.e. one pool for each 
-          user). Some other useful values to set this to are: <br/>
-          <ul> 
-            <li> group.name (to create a pool per Unix group).</li>
-            <li>mapred.job.queue.name (the same property as the queue name in 
-            <a href="capacity_scheduler.html">Capacity Scheduler</a>).</li>
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.loadmanager
+          </td>
+          <td>
+            An extension point that lets you specify a class that determines 
+            how many maps and reduces can run on a given TaskTracker. This class 
+            should implement the LoadManager interface. By default the task caps 
+            in the Hadoop config file are used, but this option could be used to 
+            make the load based on available memory and CPU utilization for example.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.taskselector
+          </td>
+          <td>
+          An extension point that lets you specify a class that determines 
+          which task from within a job to launch on a given tracker. This can be 
+          used to change either the locality policy (e.g. keep some jobs within 
+          a particular rack) or the speculative execution algorithm (select 
+          when to launch speculative tasks). The default implementation uses 
+          Hadoop's default algorithms from JobInProgress.
+          </td>
+          </tr>
+          <!--
+          <tr>
+          <td>
+            mapred.fairscheduler.eventlog.enabled
+          </td>
+          <td>
+            Enable a detailed log of fair scheduler events, useful for
+            debugging.
+            This log is stored in <em>HADOOP_LOG_DIR/fairscheduler</em>.
+            Boolean value, default: false.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.dump.interval
+          </td>
+          <td>
+            If using the event log, this is the interval at which to dump
+            complete scheduler state (list of pools and jobs) to the log.
+            Integer value in milliseconds, default: 10000.
+          </td>
+          </tr>
+          -->
+        </table>
+      </section>  
+      <section>
+        <title>Allocation File (fair-scheduler.xml)</title>
+        <p>
+        The allocation file configures minimum shares, running job
+        limits, weights and preemption timeouts for each pool.
+        Only users/pools whose values differ from the defaults need to be
+        explicitly configured in this file.
+        The allocation file is located in
+        <em>HADOOP_HOME/conf/fair-scheduler.xml</em>.
+        It can contain the following types of elements:
+        </p>
+        <ul>
+        <li><em>pool</em> elements, which configure each pool.
+        These may contain the following sub-elements:
+          <ul>
+          <li><em>minMaps</em> and <em>minReduces</em>,
+            to set the pool's minimum share of task slots.</li>
+          <li><em>maxMaps</em> and <em>maxReduces</em>, to set the
+            pool's maximum concurrent task slots.</li>
+          <li><em>schedulingMode</em>, the pool's internal scheduling mode,
+          which can be <em>fair</em> for fair sharing or <em>fifo</em> for
+          first-in-first-out.</li>
+          <li><em>maxRunningJobs</em>, 
+          to limit the number of jobs from the 
+          pool to run at once (defaults to infinite).</li>
+          <li><em>weight</em>, to share the cluster 
+          non-proportionally with other pools. For example, a pool with weight 2.0 will get a 2x higher share than other pools. The default weight is 1.0.</li>
+          <li><em>minSharePreemptionTimeout</em>, the
+            number of seconds the pool will wait before
+            killing other pools' tasks if it is below its minimum share
+            (defaults to infinite).</li>
           </ul>
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.weightadjuster
-        </td>
-        <td>
-        An extensibility point that lets you specify a class to adjust the 
-        weights of running jobs. This class should implement the 
-        <em>WeightAdjuster</em> interface. There is currently one example 
-        implementation - <em>NewJobWeightBooster</em>, which increases the 
-        weight of jobs for the first 5 minutes of their lifetime to let 
-        short jobs finish faster. To use it, set the weightadjuster 
-        property to the full class name, 
-        <code>org.apache.hadoop.mapred.NewJobWeightBooster</code> 
-        NewJobWeightBooster itself provides two parameters for setting the 
-        duration and boost factor. <br/>
-        <ol>
-        <li> <em>mapred.newjobweightbooster.factor</em>
-          Factor by which new jobs weight should be boosted. Default is 3</li>
-        <li><em>mapred.newjobweightbooster.duration</em>
-          Duration in milliseconds, default 300000 for 5 minutes</li>
-        </ol>
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.loadmanager
-        </td>
-        <td>
-          An extensibility point that lets you specify a class that determines 
-          how many maps and reduces can run on a given TaskTracker. This class 
-          should implement the LoadManager interface. By default the task caps 
-          in the Hadoop config file are used, but this option could be used to 
-          make the load based on available memory and CPU utilization for example.
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.taskselector:
-        </td>
-        <td>
-        An extensibility point that lets you specify a class that determines 
-        which task from within a job to launch on a given tracker. This can be 
-        used to change either the locality policy (e.g. keep some jobs within 
-        a particular rack) or the speculative execution algorithm (select 
-        when to launch speculative tasks). The default implementation uses 
-        Hadoop's default algorithms from JobInProgress.
-        </td>
-        </tr>
-      </table>      
+        </li>
+        <li><em>user</em> elements, which may contain a 
+        <em>maxRunningJobs</em> element to limit 
+        jobs. Note that by default, there is a pool for each 
+        user, so per-user limits are not necessary.</li>
+        <li><em>poolMaxJobsDefault</em>, which sets the default running 
+        job limit for any pools whose limit is not specified.</li>
+        <li><em>userMaxJobsDefault</em>, which sets the default running 
+        job limit for any users whose limit is not specified.</li>
+        <li><em>defaultMinSharePreemptionTimeout</em>, 
+        which sets the default minimum share preemption timeout 
+        for any pools where it is not specified.</li>
+        <li><em>fairSharePreemptionTimeout</em>, 
+        which sets the preemption timeout used when jobs are below half
+        their fair share.</li>
+        <li><em>defaultPoolSchedulingMode</em>, which sets the default scheduling 
+        mode (<em>fair</em> or <em>fifo</em>) for pools whose mode is
+        not specified.</li>
+        </ul>
+        <p>
+        Pool and user elements only required if you are setting
+        non-default values for the pool/user. That is, you do not need to
+        declare all users and all pools in your config file before running
+        the fair scheduler. If a user or pool is not listed in the config file,
+        the default values for limits, preemption timeouts, etc will be used.
+        </p>
+        <p>
+        An example allocation file is given below : </p>
+<source>
+&lt;?xml version="1.0"?&gt;  
+&lt;allocations&gt;  
+  &lt;pool name="sample_pool"&gt;
+    &lt;minMaps&gt;5&lt;/minMaps&gt;
+    &lt;minReduces&gt;5&lt;/minReduces&gt;
+    &lt;maxMaps&gt;25&lt;/maxMaps&gt;
+    &lt;maxReduces&gt;25&lt;/maxReduces&gt;
+    &lt;minSharePreemptionTimeout&gt;300&lt;/minSharePreemptionTimeout&gt;
+  &lt;/pool&gt;
+  &lt;user name="sample_user"&gt;
+    &lt;maxRunningJobs&gt;6&lt;/maxRunningJobs&gt;
+  &lt;/user&gt;
+  &lt;userMaxJobsDefault&gt;3&lt;/userMaxJobsDefault&gt;
+  &lt;fairSharePreemptionTimeout&gt;600&lt;/fairSharePreemptionTimeout&gt;
+&lt;/allocations&gt;
+</source>
+        <p>
+        This example creates a pool sample_pool with a guarantee of 5 map 
+        slots and 5 reduce slots. The pool also has a minimum share preemption
+        timeout of 300 seconds (5 minutes), meaning that if it does not get its
+        guaranteed share within this time, it is allowed to kill tasks from
+        other pools to achieve its share. The pool has a cap of 25 map and 25
+        reduce slots, which means that once 25 tasks are running, no more will
+        be scheduled even if the pool's fair share is higher.
+        The example also limits the number of running jobs 
+        per user to 3, except for sample_user, who can run 6 jobs concurrently. 
+        Finally, the example sets a fair share preemption timeout of 600 seconds
+        (10 minutes). If a job is below half its fair share for 10 minutes, it
+        will be allowed to kill tasks from other jobs to achieve its share.
+        Note that the preemption settings require preemption to be
+        enabled in <em>mapred-site.xml</em> as described earlier.
+        </p>
+        <p>
+        Any pool not defined in the allocation file will have no guaranteed 
+        capacity and no preemption timeout. Also, any pool or user with no max 
+        running jobs set in the file will be allowed to run an unlimited 
+        number of jobs.
+        </p>
+      </section>
+      <section>
+        <title>Access Control Lists (ACLs)</title>
+        <p>
+          The fair scheduler can be used in tandem with the "queue" based access
+          control system in MapReduce to restrict which pools each user can access.
+          To do this, first enable ACLs and set up some queues as described in the
+          <a href="mapred_tutorial.html#Job+Authorization">MapReduce usage guide</a>,
+          then set the fair scheduler to use one pool per queue by adding
+          the following property in <em>HADOOP_CONF_DIR/mapred-site.xml</em>:
+        </p>
+<source>
+&lt;property&gt;
+  &lt;name&gt;mapred.fairscheduler.poolnameproperty&lt;/name&gt;
+  &lt;value&gt;mapred.job.queue.name&lt;/value&gt;
+&lt;/property&gt;
+</source>
+        <p>
+          You can then set the minimum share, weight, and internal scheduling mode
+          for each pool as described earlier.
+          In addition, make sure that users submit jobs to the right queue by setting
+          the <em>mapred.job.queue.name</em> property in their jobs.
+        </p>
+      </section>
     </section>
     <section>
     <title> Administration</title>
@@ -280,14 +472,15 @@
     </p> 
     <ol>
     <li>
-      It is possible to modify pools' allocations 
-      and user and pool running job limits at runtime by editing the allocation 
-      config file. The scheduler will reload this file 10-15 seconds after it 
+      It is possible to modify minimum shares, limits, weights, preemption
+      timeouts and pool scheduling modes at runtime by editing the allocation
+      file. The scheduler will reload this file 10-15 seconds after it 
       sees that it was modified.
      </li>
      <li>
      Current jobs, pools, and fair shares  can be examined through the 
-     JobTracker's web interface, at  http://&lt;jobtracker URL&gt;/scheduler. 
+     JobTracker's web interface, at
+     <em>http://&lt;JobTracker URL&gt;/scheduler</em>. 
      On this interface, it is also possible to modify jobs' priorities or 
      move jobs from one pool to another and see the effects on the fair 
      shares (this requires JavaScript).
@@ -312,25 +505,37 @@
      the job has had, but on average it will get its fair share amount.</li>
      </ul>
      <p>
-     In addition, it is possible to turn on an "advanced" view for the web UI,
-     by going to http://&lt;jobtracker URL&gt;/scheduler?advanced. This view shows 
-     four more columns used for calculations internally:
+     In addition, it is possible to view an "advanced" version of the web 
+     UI by going to <em>http://&lt;JobTracker URL&gt;/scheduler?advanced</em>. 
+     This view shows two more columns:
      </p>
      <ul>
      <li><em>Maps/Reduce Weight</em>: Weight of the job in the fair sharing 
      calculations. This depends on priority and potentially also on 
      job size and job age if the <em>sizebasedweight</em> and 
      <em>NewJobWeightBooster</em> are enabled.</li>
-     <li><em>Map/Reduce Deficit</em>: The job's scheduling deficit in machine-
-     seconds - the amount of resources it should have gotten according to 
-     its fair share, minus how many it actually got. Positive deficit means
-      the job will be scheduled again in the near future because it needs to 
-      catch up to its fair share. The scheduler schedules jobs with higher 
-      deficit ahead of others. Please see the Implementation section of 
-      this document for details.</li>
      </ul>
     </section>
     <section>
+      <title>Metrics</title>
+      <p>
+        The fair scheduler can export metrics using the Hadoop metrics interface.
+        This can be enabled by adding an entry to <code>hadoop-metrics.properties</code>
+        to enable the <code>fairscheduler</code> metrics context. For example, to
+        simply retain the metrics in memory so they may be viewed in the <code>/metrics</code>
+        servlet:
+      </p>
+      <p>
+        <code>fairscheduler.class=org.apache.hadoop.metrics.spi.NoEmitMetricsContext</code>
+      </p>
+      <p>
+        Metrics are generated for each pool and job, and contain the same information that
+        is visible on the <code>/scheduler</code> web page.
+      </p>
+    </section>
+
+    <!--
+    <section>
     <title>Implementation</title>
     <p>There are two aspects to implementing fair scheduling: Calculating 
     each job's fair share, and choosing which job to run when a task slot 
@@ -359,13 +564,31 @@
      This capacity is divided among the jobs in that pool according again to 
      their weights.
      </p>
-     <p>Finally, when limits on a user's running jobs or a pool's running jobs 
+     <p>When limits on a user's running jobs or a pool's running jobs 
      are in place, we choose which jobs get to run by sorting all jobs in order 
      of priority and then submit time, as in the standard Hadoop scheduler. Any 
      jobs that fall after the user/pool's limit in this ordering are queued up 
      and wait idle until they can be run. During this time, they are ignored 
      from the fair sharing calculations and do not gain or lose deficit (their 
      fair share is set to zero).</p>
+     <p>
+     Preemption is implemented by periodically checking whether jobs are
+     below their minimum share or below half their fair share. If a job has
+     been below its share for sufficiently long, it is allowed to kill
+     other jobs' tasks. The tasks chosen are the most-recently-launched
+     tasks from over-allocated jobs, to minimize the amount of wasted
+     computation.
+     </p>
+     <p>
+     Finally, the fair scheduler provides several extension points where
+     the basic functionality can be extended. For example, the weight
+     calculation can be modified to give a priority boost to new jobs,
+     implementing a "shortest job first" policy which reduces response
+     times for interactive jobs even further.
+     These extension points are listed in
+     <a href="#Scheduler+Parameters+in+mapred-site.xml">Advanced Parameters</a>.
+     </p>
     </section>
+    -->
   </body>  
 </document>

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Sun Sep 11 23:57:37 2011
@@ -1367,8 +1367,8 @@ public class JobInProgress {
     }
   }
   
-  public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
-                                                     int clusterSize, 
+  public synchronized Task obtainNewNodeLocalMapTask(TaskTrackerStatus tts,
+                                                     int clusterSize,
                                                      int numUniqueHosts)
   throws IOException {
     if (!tasksInited) {
@@ -1378,6 +1378,31 @@ public class JobInProgress {
       return null;
     }
 
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 1, 
+                                status.mapProgress());
+    if (target == -1) {
+      return null;
+    }
+
+    Task result = maps[target].getTaskToRun(tts.getTrackerName());
+    if (result != null) {
+      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+      resetSchedulingOpportunities();
+    }
+
+    return result;
+  }
+  
+  public synchronized Task obtainNewNodeOrRackLocalMapTask(
+      TaskTrackerStatus tts, int clusterSize, int numUniqueHosts)
+  throws IOException {
+    if (!tasksInited) {
+      LOG.info("Cannot create task split for " + profile.getJobID());
+      try { throw new IOException("state = " + status.getRunState()); }
+      catch (IOException ioe) {ioe.printStackTrace();}
+      return null;
+    }
+
     int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, 
                                 status.mapProgress());
     if (target == -1) {
@@ -3504,4 +3529,31 @@ public class JobInProgress {
     LOG.info("jobToken generated and stored with users keys in "
         + keysFile.toUri().getPath());
   }
+
+  /**
+   * Get the level of locality that a given task would have if launched on
+   * a particular TaskTracker. Returns 0 if the task has data on that machine,
+   * 1 if it has data on the same rack, etc (depending on number of levels in
+   * the network hierarchy).
+   */
+  int getLocalityLevel(TaskInProgress tip, TaskTrackerStatus tts) {
+    Node tracker = jobtracker.getNode(tts.getHost());
+    int level = this.maxLevel;
+    // find the right level across split locations
+    for (String local : maps[tip.getIdWithinJob()].getSplitLocations()) {
+      Node datanode = jobtracker.getNode(local);
+      int newLevel = this.maxLevel;
+      if (tracker != null && datanode != null) {
+        newLevel = getMatchingLevelForNodes(tracker, datanode);
+      }
+      if (newLevel < level) {
+        level = newLevel;
+        // an optimization
+        if (level == 0) {
+          break;
+        }
+      }
+    }
+    return level;
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Sun Sep 11 23:57:37 2011
@@ -168,8 +168,8 @@ class JobQueueTaskScheduler extends Task
           
           // Try to schedule a node-local or rack-local Map task
           t = 
-            job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers,
-                                      taskTrackerManager.getNumberOfUniqueHosts());
+            job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, 
+                numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
           if (t != null) {
             assignedTasks.add(t);
             ++numLocalMaps;

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java Sun Sep 11 23:57:37 2011
@@ -88,7 +88,17 @@ interface TaskTrackerManager {
    * @return jobInProgress object
    */
   public JobInProgress getJob(JobID jobid);
-  
+
+  /**
+   * Mark the task attempt identified by taskid to be killed
+   * 
+   * @param taskid task to kill
+   * @param shouldFail whether to count the task as failed
+   * @return true if the task was found and successfully marked to kill
+   */
+  public boolean killTask(TaskAttemptID taskid, boolean shouldFail)
+      throws IOException;  
+
   /**
    * Initialize the Job
    * 

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Sun Sep 11 23:57:37 2011
@@ -60,7 +60,7 @@ public class TestJobQueueTaskScheduler e
     }
 
     @Override
-    public Task obtainNewLocalMapTask(TaskTrackerStatus tts, int clusterSize, 
+    public Task obtainNewNodeOrRackLocalMapTask(TaskTrackerStatus tts, int clusterSize, 
                                       int ignored) 
     throws IOException {
       return obtainNewMapTask(tts, clusterSize, ignored);
@@ -204,6 +204,12 @@ public class TestJobQueueTaskScheduler e
     public void failJob(JobInProgress job) {
       // do nothing
     }
+
+    @Override
+    public boolean killTask(TaskAttemptID taskid, boolean shouldFail)
+      throws IOException {
+      return false;
+    }
     
     // Test methods
     

Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskControllerLaunchArgs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskControllerLaunchArgs.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskControllerLaunchArgs.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskControllerLaunchArgs.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
+import org.apache.hadoop.mapred.Task;
+
+import junit.framework.TestCase;
+
+/**
+ * The test case tests whether {@link LinuxTaskController} passes all required
+ * jvm properties in its initializeJob() and truncateLogsAsUser() methods,
+ * which launches jvm through the native task-controller.
+ */
+public class TestLinuxTaskControllerLaunchArgs extends TestCase {
+  private static final Log LOG = LogFactory.getLog(
+                                   TestLinuxTaskControllerLaunchArgs.class);
+  private static File testDir = new File(System.getProperty("test.build.data",
+                "/tmp"), TestLinuxTaskControllerLaunchArgs.class.getName());
+  private static File fakeTaskController = new File(testDir, "faketc.sh");
+  private static File mapredLocal = new File(testDir, "mapred/local");
+  private TaskController ltc;
+  private boolean initialized = false;
+  private String user = new String("testuser");
+  private InetSocketAddress addr = new InetSocketAddress("localhost", 3209);
+
+  Configuration conf = new Configuration();
+
+  // Do-nothing fake {@link MapTask} class
+  public static class MyMapTask extends MapTask {
+    @Override
+    public void write(DataOutput out) throws IOException {
+      // nothing
+    }
+  }
+
+  
+  // The shell script is used to fake the native task-controller.
+  // It checks the arguments for required java properties and args.
+  protected void createFakeTCScript() throws Exception {
+    FileWriter out = new FileWriter(fakeTaskController);
+    out.write("#!/bin/bash\n");
+    // setup() calls with zero args and expects 1 in return.
+    out.write("if [ $# -eq 0 ]; then exit 1; fi\n");
+
+    // Check for java, classpath, h.log.dir, h.root.logger and java.library.path
+    out.write("for LARG in \"$@\"\n");
+    out.write("do case \"$LARG\" in\n");
+    out.write("*/java) LTC_ARG1=1;;\n");
+    out.write("-classpath) LTC_ARG2=1;;\n");
+    out.write("-Dhadoop.log.dir*) LTC_ARG3=1;;\n");
+    out.write("-Dhadoop.root.logger*) LTC_ARG4=1;;\n");
+    out.write("-Djava.library.path*) LTC_ARG5=1;;\n");
+    out.write("esac; done\n");
+    out.write("LTC_ARGS=$((LTC_ARG1+LTC_ARG2+LTC_ARG3+LTC_ARG4+LTC_ARG5))\n");
+    out.write("if [ $LTC_ARGS -eq 5 ]; then exit 0; else exit 22; fi\n");
+    out.close();
+    fakeTaskController.setExecutable(true);
+  }
+
+  protected void initMyTest() throws Exception {
+    testDir.mkdirs();
+    mapredLocal.mkdirs();
+    createFakeTCScript();
+    conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, mapredLocal.toString());
+
+    // Set the task-controller binary path.
+    conf.set("mapreduce.tasktracker.task-controller.exe", fakeTaskController.toString());
+    ltc = new LinuxTaskController();
+    ltc.setConf(conf);
+
+    // LinuxTaskController runs task-controller in setup() with no 
+    // argument and expects 1 in return
+    try {
+      ltc.setup(new LocalDirAllocator(mapredLocal.toString()),
+                 new LocalStorage(new String[]{mapredLocal.toString()}));
+    } catch (IOException ie) {
+      fail("Error running task-controller from setup().");
+    }
+
+    initialized = true;
+  }
+
+
+  /**
+   * LinuxTaskController runs task-controller and it runs JobLocalizer
+   * in initializeJob(). task-controller should be prodived with all
+   * necessary java properties to launch JobLocalizer successfully.
+   */
+  public void testLTCCallInitializeJob() throws Exception {
+    if (!initialized) {
+      initMyTest();
+    }
+    
+    try {
+      ltc.initializeJob(user, new String("jobid"), new Path("/cred.xml"),
+                                       new Path("/job.xml"), null, addr);
+    } catch (IOException ie) {
+      fail("Missing argument when running task-controller from " +
+                                                   "initializeJob().\n");
+    }
+  }
+
+  /**
+   * LinuxTaskController runs task-controller and it runs TaskLogsTruncater
+   * in truncateLogsAsUser(). task-controller should be prodived with all
+   * necessary java properties to launch JobLocalizer successfully.
+   */
+  public void testLTCCallTruncateLogsAsUser() throws Exception {
+    if (!initialized) {
+      initMyTest();
+    }
+
+    List<Task> tasks = new ArrayList<Task>();
+    tasks.add(new MyMapTask());
+
+    try {
+      ltc.truncateLogsAsUser(user, tasks);
+    } catch (IOException ie) {
+      fail("Missing argument when running task-controller from " +
+                                               "truncateLogsAsUser()\n");
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Sun Sep 11 23:57:37 2011
@@ -155,6 +155,13 @@ public class TestParallelInitialization 
         failJob(job);
       }
     }
+    
+    @Override
+    public boolean killTask(TaskAttemptID taskid, boolean shouldFail)
+      throws IOException {
+      return false;
+    }
+
     // Test methods
     
     public synchronized void failJob(JobInProgress job) {

Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/TestSleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/TestSleepJob.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/TestSleepJob.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/TestSleepJob.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Dummy class for testing MR framefork. Sleeps for a defined period 
+ * of time in mapper and reducer. Generates fake input for map / reduce 
+ * jobs. Note that generated number of input pairs is in the order 
+ * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
+ * some disk space.
+ */
+public class TestSleepJob extends Configured implements Tool {
+  public static String MAP_SLEEP_COUNT = "mapreduce.sleepjob.map.sleep.count";
+  public static String REDUCE_SLEEP_COUNT = 
+    "mapreduce.sleepjob.reduce.sleep.count";
+  public static String MAP_SLEEP_TIME = "mapreduce.sleepjob.map.sleep.time";
+  public static String REDUCE_SLEEP_TIME = 
+    "mapreduce.sleepjob.reduce.sleep.time";
+
+  public static class TestSleepJobPartitioner extends 
+      Partitioner<IntWritable, NullWritable> {
+    public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+      return k.get() % numPartitions;
+    }
+  }
+  
+  public static class EmptySplit extends InputSplit implements Writable {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public static class SleepInputFormat 
+      extends InputFormat<IntWritable,IntWritable> {
+    
+    public List<InputSplit> getSplits(JobContext jobContext) {
+      List<InputSplit> ret = new ArrayList<InputSplit>();
+      int numSplits = jobContext.getConfiguration().
+                        getInt("mapred.map.tasks", 1);
+      for (int i = 0; i < numSplits; ++i) {
+        ret.add(new EmptySplit());
+      }
+      return ret;
+    }
+    
+    public RecordReader<IntWritable,IntWritable> createRecordReader(
+        InputSplit ignored, TaskAttemptContext taskContext)
+        throws IOException {
+      Configuration conf = taskContext.getConfiguration();
+      final int count = conf.getInt(MAP_SLEEP_COUNT, 1);
+      if (count < 0) throw new IOException("Invalid map count: " + count);
+      final int redcount = conf.getInt(REDUCE_SLEEP_COUNT, 1);
+      if (redcount < 0)
+        throw new IOException("Invalid reduce count: " + redcount);
+      final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+      
+      return new RecordReader<IntWritable,IntWritable>() {
+        private int records = 0;
+        private int emitCount = 0;
+        private IntWritable key = null;
+        private IntWritable value = null;
+        public void initialize(InputSplit split, TaskAttemptContext context) {
+        }
+
+        public boolean nextKeyValue()
+            throws IOException {
+          if (count == 0) {
+            return false;
+          }
+          key = new IntWritable();
+          key.set(emitCount);
+          int emit = emitPerMapTask / count;
+          if ((emitPerMapTask) % count > records) {
+            ++emit;
+          }
+          emitCount += emit;
+          value = new IntWritable();
+          value.set(emit);
+          return records++ < count;
+        }
+        public IntWritable getCurrentKey() { return key; }
+        public IntWritable getCurrentValue() { return value; }
+        public void close() throws IOException { }
+        public float getProgress() throws IOException {
+          return count == 0 ? 100 : records / ((float)count);
+        }
+      };
+    }
+  }
+
+  public static class SleepMapper 
+      extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+    private long mapSleepDuration = 100;
+    private int mapSleepCount = 1;
+    private int count = 0;
+
+    protected void setup(Context context) 
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.mapSleepCount =
+        conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
+      this.mapSleepDuration = mapSleepCount == 0 ? 0 :
+        conf.getLong(MAP_SLEEP_TIME , 100) / mapSleepCount;
+    }
+
+    public void map(IntWritable key, IntWritable value, Context context
+               ) throws IOException, InterruptedException {
+      //it is expected that every map processes mapSleepCount number of records. 
+      try {
+        context.setStatus("Sleeping... (" +
+          (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+        Thread.sleep(mapSleepDuration);
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+            "Interrupted while sleeping").initCause(ex);
+      }
+      ++count;
+      // output reduceSleepCount * numReduce number of random values, so that
+      // each reducer will get reduceSleepCount number of keys.
+      int k = key.get();
+      for (int i = 0; i < value.get(); ++i) {
+        context.write(new IntWritable(k + i), NullWritable.get());
+      }
+    }
+  }
+  
+  public static class SleepReducer  
+      extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
+    private long reduceSleepDuration = 100;
+    private int reduceSleepCount = 1;
+    private int count = 0;
+
+    protected void setup(Context context) 
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.reduceSleepCount =
+        conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+      this.reduceSleepDuration = reduceSleepCount == 0 ? 0 : 
+        conf.getLong(REDUCE_SLEEP_TIME , 100) / reduceSleepCount;
+    }
+
+    public void reduce(IntWritable key, Iterable<NullWritable> values,
+                       Context context)
+      throws IOException {
+      try {
+        context.setStatus("Sleeping... (" +
+            (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+        Thread.sleep(reduceSleepDuration);
+      
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+          "Interrupted while sleeping").initCause(ex);
+      }
+      count++;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new TestSleepJob(), args);
+    System.exit(res);
+  }
+
+  public Job createJob(int numMapper, int numReducer, 
+                       long mapSleepTime, int mapSleepCount, 
+                       long reduceSleepTime, int reduceSleepCount) 
+      throws IOException {
+    Configuration conf = getConf();
+    conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
+    conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
+    conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
+    conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+    conf.setInt("mapred.map.tasks", numMapper);
+    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+    conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+    Job job = new Job(conf, "sleep");
+    job.setNumReduceTasks(numReducer);
+    job.setJarByClass(TestSleepJob.class);
+    job.setNumReduceTasks(numReducer);
+    job.setMapperClass(SleepMapper.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setReducerClass(SleepReducer.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setInputFormatClass(SleepInputFormat.class);
+    job.setPartitionerClass(TestSleepJobPartitioner.class);
+    job.setJobName("Sleep job");
+    FileInputFormat.addInputPath(job, new Path("ignored"));
+    return job;
+  }
+
+  public int run(String[] args) throws Exception {
+
+    if(args.length < 1) {
+      System.err.println("TestSleepJob [-m numMapper] [-r numReducer]" +
+          " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
+          " [-recordt recordSleepTime (msec)]");
+      ToolRunner.printGenericCommandUsage(System.err);
+      return 2;
+    }
+
+    int numMapper = 1, numReducer = 1;
+    long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
+    int mapSleepCount = 1, reduceSleepCount = 1;
+
+    for(int i=0; i < args.length; i++ ) {
+      if(args[i].equals("-m")) {
+        numMapper = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-r")) {
+        numReducer = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-mt")) {
+        mapSleepTime = Long.parseLong(args[++i]);
+      }
+      else if(args[i].equals("-rt")) {
+        reduceSleepTime = Long.parseLong(args[++i]);
+      }
+      else if (args[i].equals("-recordt")) {
+        recSleepTime = Long.parseLong(args[++i]);
+      }
+    }
+    
+    // sleep for *SleepTime duration in Task by recSleepTime per record
+    mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
+    reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
+    Job job = createJob(numMapper, numReducer, mapSleepTime,
+                mapSleepCount, reduceSleepTime, reduceSleepCount);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+}



Mime
View raw message