hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r690093 [2/2] - in /hadoop/core/trunk: ./ src/contrib/ src/contrib/fairscheduler/ src/contrib/fairscheduler/src/ src/contrib/fairscheduler/src/java/ src/contrib/fairscheduler/src/java/org/ src/contrib/fairscheduler/src/java/org/apache/ src/...
Date Fri, 29 Aug 2008 03:50:07 GMT
Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+/**
+ * Maintains a hierarchy of pools.
+ */
+public class PoolManager {
+  public static final Log LOG = LogFactory.getLog(
+    "org.apache.hadoop.mapred.PoolManager");
+
+  /** Time to wait between checks of the allocation file */
+  public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
+  
+  /**
+   * Time to wait after the allocation has been modified before reloading it
+   * (this is done to prevent loading a file that hasn't been fully written).
+   */
+  public static final long ALLOC_RELOAD_WAIT = 5 * 1000; 
+  
+  // Map and reduce minimum allocations for each pool
+  private Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
+  private Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
+  
+  // Max concurrent running jobs for each pool and for each user; in addition,
+  // for users that have no max specified, we use the userMaxJobsDefault.
+  private Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
+  private Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
+  private int userMaxJobsDefault = Integer.MAX_VALUE;
+
+  private String allocFile; // Path to XML file containing allocations
+  private String poolNameProperty; // Jobconf property to use for determining a
+                                   // job's pool name (default: queue.name)
+  
+  private Map<String, Pool> pools = new HashMap<String, Pool>();
+  
+  private long lastReloadAttempt; // Last time we tried to reload the pools file
+  private long lastSuccessfulReload; // Last time we successfully reloaded pools
+  private boolean lastReloadAttemptFailed = false;
+
+  public PoolManager(Configuration conf) throws IOException, SAXException,
+      AllocationConfigurationException, ParserConfigurationException {
+    this.poolNameProperty = conf.get(
+        "mapred.fairscheduler.poolnameproperty", "queue.name");
+    this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
+    if (allocFile == null) {
+      LOG.warn("No mapred.fairscheduler.allocation.file given in jobconf - " +
+          "the fair scheduler will not use any queues.");
+    }
+    reloadAllocs();
+    lastSuccessfulReload = System.currentTimeMillis();
+    lastReloadAttempt = System.currentTimeMillis();
+    // Create the default pool so that it shows up in the web UI
+    getPool(Pool.DEFAULT_POOL_NAME);
+  }
+  
+  /**
+   * Get a pool by name, creating it if necessary
+   */
+  public synchronized Pool getPool(String name) {
+    Pool pool = pools.get(name);
+    if (pool == null) {
+      pool = new Pool(name);
+      pools.put(name, pool);
+    }
+    return pool;
+  }
+
+  /**
+   * Reload allocations file if it hasn't been loaded in a while
+   */
+  public void reloadAllocsIfNecessary() {
+    long time = System.currentTimeMillis();
+    if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
+      lastReloadAttempt = time;
+      try {
+        File file = new File(allocFile);
+        long lastModified = file.lastModified();
+        if (lastModified > lastSuccessfulReload &&
+            time > lastModified + ALLOC_RELOAD_WAIT) {
+          reloadAllocs();
+          lastSuccessfulReload = time;
+          lastReloadAttemptFailed = false;
+        }
+      } catch (Exception e) {
+        // Throwing the error further out here won't help - the RPC thread
+        // will catch it and report it in a loop. Instead, just log it and
+        // hope somebody will notice from the log.
+        // We log the error only on the first failure so we don't fill up the
+        // JobTracker's log with these messages.
+        if (!lastReloadAttemptFailed) {
+          LOG.error("Failed to reload allocations file - " +
+              "will use existing allocations.", e);
+        }
+        lastReloadAttemptFailed = true;
+      }
+    }
+  }
+  
+  /**
+   * Updates the allocation list from the allocation config file. This file is
+   * expected to be in the following whitespace-separated format:
+   * 
+   * <code>
+   * poolName1 mapAlloc reduceAlloc
+   * poolName2 mapAlloc reduceAlloc
+   * ...
+   * </code>
+   * 
+   * Blank lines and lines starting with # are ignored.
+   *  
+   * @throws IOException if the config file cannot be read.
+   * @throws AllocationConfigurationException if allocations are invalid.
+   * @throws ParserConfigurationException if XML parser is misconfigured.
+   * @throws SAXException if config file is malformed.
+   */
+  public void reloadAllocs() throws IOException, ParserConfigurationException, 
+      SAXException, AllocationConfigurationException {
+    if (allocFile == null) return;
+    // Create some temporary hashmaps to hold the new allocs, and we only save
+    // them in our fields if we have parsed the entire allocs file successfully.
+    Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
+    Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
+    Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
+    Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
+    int userMaxJobsDefault = Integer.MAX_VALUE;
+    
+    // Remember all pool names so we can display them on web UI, etc.
+    List<String> poolNamesInAllocFile = new ArrayList<String>();
+    
+    // Read and parse the allocations file.
+    DocumentBuilderFactory docBuilderFactory =
+      DocumentBuilderFactory.newInstance();
+    docBuilderFactory.setIgnoringComments(true);
+    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+    Document doc = builder.parse(new File(allocFile));
+    Element root = doc.getDocumentElement();
+    if (!"allocations".equals(root.getTagName()))
+      throw new AllocationConfigurationException("Bad allocations file: " + 
+          "top-level element not <allocations>");
+    NodeList elements = root.getChildNodes();
+    for (int i = 0; i < elements.getLength(); i++) {
+      Node node = elements.item(i);
+      if (!(node instanceof Element))
+        continue;
+      Element element = (Element)node;
+      if ("pool".equals(element.getTagName())) {
+        String poolName = element.getAttribute("name");
+        poolNamesInAllocFile.add(poolName);
+        NodeList fields = element.getChildNodes();
+        for (int j = 0; j < fields.getLength(); j++) {
+          Node fieldNode = fields.item(j);
+          if (!(fieldNode instanceof Element))
+            continue;
+          Element field = (Element) fieldNode;
+          if ("minMaps".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            mapAllocs.put(poolName, val);
+          } else if ("minReduces".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            reduceAllocs.put(poolName, val);
+          } else if ("maxRunningJobs".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            poolMaxJobs.put(poolName, val);
+          }
+        }
+      } else if ("user".equals(element.getTagName())) {
+        String userName = element.getAttribute("name");
+        NodeList fields = element.getChildNodes();
+        for (int j = 0; j < fields.getLength(); j++) {
+          Node fieldNode = fields.item(j);
+          if (!(fieldNode instanceof Element))
+            continue;
+          Element field = (Element) fieldNode;
+          if ("maxRunningJobs".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            userMaxJobs.put(userName, val);
+          }
+        }
+      } else if ("userMaxJobsDefault".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        userMaxJobsDefault = val;
+      } else {
+        LOG.warn("Bad element in allocations file: " + element.getTagName());
+      }
+    }
+    
+    // Commit the reload; also create any pool defined in the alloc file
+    // if it does not already exist, so it can be displayed on the web UI.
+    synchronized(this) {
+      this.mapAllocs = mapAllocs;
+      this.reduceAllocs = reduceAllocs;
+      this.poolMaxJobs = poolMaxJobs;
+      this.userMaxJobs = userMaxJobs;
+      this.userMaxJobsDefault = userMaxJobsDefault;
+      for (String name: poolNamesInAllocFile) {
+        getPool(name);
+      }
+    }
+  }
+
+  /**
+   * Get the allocation for a particular pool
+   */
+  public int getAllocation(String pool, TaskType taskType) {
+    Map<String, Integer> allocationMap = (taskType == TaskType.MAP ?
+        mapAllocs : reduceAllocs);
+    Integer alloc = allocationMap.get(pool);
+    return (alloc == null ? 0 : alloc);
+  }
+  
+  /**
+   * Add a job in the appropriate pool
+   */
+  public synchronized void addJob(JobInProgress job) {
+    getPool(getPoolName(job)).addJob(job);
+  }
+  
+  /**
+   * Remove a job
+   */
+  public synchronized void removeJob(JobInProgress job) {
+    getPool(getPoolName(job)).removeJob(job);
+  }
+  
+  /**
+   * Change the pool of a particular job
+   */
+  public synchronized void setPool(JobInProgress job, String pool) {
+    removeJob(job);
+    job.getJobConf().set(poolNameProperty, pool);
+    addJob(job);
+  }
+
+  /**
+   * Get a collection of all pools
+   */
+  public synchronized Collection<Pool> getPools() {
+    return pools.values();
+  }
+  
+  /**
+   * Get the pool name for a JobInProgress from its configuration. This uses
+   * the "project" property in the jobconf by default, or the property set with
+   * "mapred.fairscheduler.poolnameproperty".
+   */
+  public String getPoolName(JobInProgress job) {
+    JobConf conf = job.getJobConf();
+    return conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME).trim();
+  }
+
+  /**
+   * Get all pool names that have been seen either in the allocation file or in
+   * a MapReduce job.
+   */
+  public synchronized Collection<String> getPoolNames() {
+    List<String> list = new ArrayList<String>();
+    for (Pool pool: getPools()) {
+      list.add(pool.getName());
+    }
+    Collections.sort(list);
+    return list;
+  }
+
+  public int getUserMaxJobs(String user) {
+    if (userMaxJobs.containsKey(user)) {
+      return userMaxJobs.get(user);
+    } else {
+      return userMaxJobsDefault;
+    }
+  }
+
+  public int getPoolMaxJobs(String pool) {
+    if (poolMaxJobs.containsKey(pool)) {
+      return poolMaxJobs.get(pool);
+    } else {
+      return Integer.MAX_VALUE;
+    }
+  }
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A pluggable object for selecting tasks to run from a {@link JobInProgress} on
+ * a given {@link TaskTracker}, for use by the {@link TaskScheduler}. The
+ * <code>TaskSelector</code> is in charge of managing both locality and
+ * speculative execution. For the latter purpose, it must also provide counts of
+ * how many tasks each speculative job needs to launch, so that the scheduler
+ * can take this into account in its calculations.
+ */
+public abstract class TaskSelector implements Configurable {
+  protected Configuration conf;
+  protected TaskTrackerManager taskTrackerManager;
+  
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public synchronized void setTaskTrackerManager(
+      TaskTrackerManager taskTrackerManager) {
+    this.taskTrackerManager = taskTrackerManager;
+  }
+  
+  /**
+   * Lifecycle method to allow the TaskSelector to start any work in separate
+   * threads.
+   */
+  public void start() throws IOException {
+    // do nothing
+  }
+  
+  /**
+   * Lifecycle method to allow the TaskSelector to stop any work it is doing.
+   */
+  public void terminate() throws IOException {
+    // do nothing
+  }
+  
+  /**
+   * How many speculative map tasks does the given job want to launch?
+   * @param job The job to count speculative maps for
+   * @return Number of speculative maps that can be launched for job
+   */
+  public abstract int neededSpeculativeMaps(JobInProgress job);
+
+  /**
+   * How many speculative reduce tasks does the given job want to launch?
+   * @param job The job to count speculative reduces for
+   * @return Number of speculative reduces that can be launched for job
+   */
+  public abstract int neededSpeculativeReduces(JobInProgress job);
+  
+  /**
+   * Choose a map task to run from the given job on the given TaskTracker.
+   * @param taskTracker {@link TaskTrackerStatus} of machine to run on
+   * @param job Job to select a task for
+   * @return A {@link Task} to run on the machine, or <code>null</code> if
+   *         no map should be launched from this job on the task tracker.
+   * @throws IOException 
+   */
+  public abstract Task obtainNewMapTask(TaskTrackerStatus taskTracker,
+      JobInProgress job) throws IOException;
+
+  /**
+   * Choose a reduce task to run from the given job on the given TaskTracker.
+   * @param taskTracker {@link TaskTrackerStatus} of machine to run on
+   * @param job Job to select a task for
+   * @return A {@link Task} to run on the machine, or <code>null</code> if
+   *         no reduce should be launched from this job on the task tracker.
+   * @throws IOException 
+   */
+  public abstract Task obtainNewReduceTask(TaskTrackerStatus taskTracker,
+      JobInProgress job) throws IOException;
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskType.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskType.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskType.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskType.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Utility enum for map and reduce task types.
+ */
+public enum TaskType {
+  MAP, REDUCE
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configurable;
+
+/**
+ * A pluggable object for altering the weights of jobs in the fair scheduler,
+ * which is used for example by {@link NewJobWeightBooster} to give higher
+ * weight to new jobs so that short jobs finish faster.
+ * 
+ * May implement {@link Configurable} to access configuration parameters.
+ */
+public interface WeightAdjuster {
+  public double adjustWeight(JobInProgress job, TaskType taskType,
+      double curWeight);
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,1064 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+
+public class TestFairScheduler extends TestCase {
+  final static String TEST_DIR = new File(".").getAbsolutePath();
+  final static String ALLOC_FILE = new File("./test-pools").getAbsolutePath();
+  private static final String POOL_PROPERTY = "pool";
+  
+  private static int jobCounter;
+  private static int taskCounter;
+  
+  static class FakeJobInProgress extends JobInProgress {
+    
+    private FakeTaskTrackerManager taskTrackerManager;
+    
+    public FakeJobInProgress(JobConf jobConf,
+        FakeTaskTrackerManager taskTrackerManager) throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf);
+      this.taskTrackerManager = taskTrackerManager;
+      this.startTime = System.currentTimeMillis();
+      this.status = new JobStatus();
+      this.status.setRunState(JobStatus.PREP);
+    }
+    
+    @Override
+    public synchronized void initTasks() throws IOException {
+      // do nothing
+    }
+
+    @Override
+    public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
+        int ignored) throws IOException {
+      TaskAttemptID attemptId = getTaskAttemptID(true);
+      Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+        @Override
+        public String toString() {
+          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+        }
+      };
+      taskTrackerManager.startTask(tts.getTrackerName(), task);
+      runningMapTasks++;
+      return task;
+    }
+    
+    @Override
+    public Task obtainNewReduceTask(final TaskTrackerStatus tts,
+        int clusterSize, int ignored) throws IOException {
+      TaskAttemptID attemptId = getTaskAttemptID(false);
+      Task task = new ReduceTask("", attemptId, 0, 10) {
+        @Override
+        public String toString() {
+          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+        }
+      };
+      taskTrackerManager.startTask(tts.getTrackerName(), task);
+      runningReduceTasks++;
+      return task;
+    }
+    
+    private TaskAttemptID getTaskAttemptID(boolean isMap) {
+      JobID jobId = getJobID();
+      return new TaskAttemptID(jobId.getJtIdentifier(),
+          jobId.getId(), isMap, ++taskCounter, 0);
+    }
+  }
+  
+  static class FakeTaskTrackerManager implements TaskTrackerManager {
+    int maps = 0;
+    int reduces = 0;
+    int maxMapTasksPerTracker = 2;
+    int maxReduceTasksPerTracker = 2;
+    List<JobInProgressListener> listeners =
+      new ArrayList<JobInProgressListener>();
+    
+    private Map<String, TaskTrackerStatus> trackers =
+      new HashMap<String, TaskTrackerStatus>();
+    private Map<String, TaskStatus> taskStatuses = 
+      new HashMap<String, TaskStatus>();
+
+    public FakeTaskTrackerManager() {
+      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
+          new ArrayList<TaskStatus>(), 0,
+          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+      trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
+          new ArrayList<TaskStatus>(), 0,
+          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+    }
+    
+    @Override
+    public ClusterStatus getClusterStatus() {
+      int numTrackers = trackers.size();
+      return new ClusterStatus(numTrackers, maps, reduces,
+          numTrackers * maxMapTasksPerTracker,
+          numTrackers * maxReduceTasksPerTracker,
+          JobTracker.State.RUNNING);
+    }
+
+    @Override
+    public int getNumberOfUniqueHosts() {
+      return 0;
+    }
+
+    @Override
+    public Collection<TaskTrackerStatus> taskTrackers() {
+      return trackers.values();
+    }
+
+
+    @Override
+    public void addJobInProgressListener(JobInProgressListener listener) {
+      listeners.add(listener);
+    }
+
+    @Override
+    public void removeJobInProgressListener(JobInProgressListener listener) {
+      listeners.remove(listener);
+    }
+    
+    // Test methods
+    
+    public void submitJob(JobInProgress job) {
+      for (JobInProgressListener listener : listeners) {
+        listener.jobAdded(job);
+      }
+    }
+    
+    public TaskTrackerStatus getTaskTracker(String trackerID) {
+      return trackers.get(trackerID);
+    }
+    
+    public void startTask(String taskTrackerName, final Task t) {
+      if (t.isMapTask()) {
+        maps++;
+      } else {
+        reduces++;
+      }
+      TaskStatus status = new TaskStatus() {
+        @Override
+        public boolean getIsMap() {
+          return t.isMapTask();
+        }
+      };
+      taskStatuses.put(t.getTaskID().toString(), status);
+      status.setRunState(TaskStatus.State.RUNNING);
+      trackers.get(taskTrackerName).getTaskReports().add(status);
+    }
+    
+    public void finishTask(String taskTrackerName, String tipId) {
+      TaskStatus status = taskStatuses.get(tipId);
+      if (status.getIsMap()) {
+        maps--;
+      } else {
+        reduces--;
+      }
+      status.setRunState(TaskStatus.State.SUCCEEDED);
+    }
+  }
+  
+  protected class FakeClock extends FairScheduler.Clock {
+    private long time = 0;
+    
+    public void advance(long millis) {
+      time += millis;
+    }
+
+    @Override
+    long getTime() {
+      return time;
+    }
+  }
+  
+  protected JobConf conf;
+  protected FairScheduler scheduler;
+  private FakeTaskTrackerManager taskTrackerManager;
+  private FakeClock clock;
+
+  @Override
+  protected void setUp() throws Exception {
+    jobCounter = 0;
+    taskCounter = 0;
+    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+    // Create an empty pools file (so we can add/remove pools later)
+    FileWriter fileWriter = new FileWriter(ALLOC_FILE);
+    fileWriter.write("<?xml version=\"1.0\"?>\n");
+    fileWriter.write("<allocations />\n");
+    fileWriter.close();
+    conf = new JobConf();
+    conf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
+    conf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
+    taskTrackerManager = new FakeTaskTrackerManager();
+    clock = new FakeClock();
+    scheduler = new FairScheduler(clock, false);
+    scheduler.setConf(conf);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    scheduler.start();
+  }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    if (scheduler != null) {
+      scheduler.terminate();
+    }
+  }
+  
+  private JobInProgress submitJob(int state, int maps, int reduces)
+      throws IOException {
+    return submitJob(state, maps, reduces, null);
+  }
+  
+  private JobInProgress submitJob(int state, int maps, int reduces, String pool)
+      throws IOException {
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setNumMapTasks(maps);
+    jobConf.setNumReduceTasks(reduces);
+    if (pool != null)
+      jobConf.set(POOL_PROPERTY, pool);
+    JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
+    job.getStatus().setRunState(state);
+    taskTrackerManager.submitJob(job);
+    job.startTime = clock.time;
+    return job;
+  }
+  
+  protected void submitJobs(int number, int state, int maps, int reduces)
+    throws IOException {
+    for (int i = 0; i < number; i++) {
+      submitJob(state, maps, reduces);
+    }
+  }
+
+  public void testAllocationFileParsing() throws Exception {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>"); 
+    // Give pool A a minimum of 1 map, 2 reduces
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>1</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("</pool>");
+    // Give pool B a minimum of 2 maps, 1 reduce
+    out.println("<pool name=\"poolB\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>1</minReduces>");
+    out.println("</pool>");
+    // Give pool C min maps but no min reduces
+    out.println("<pool name=\"poolC\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("</pool>");
+    // Give pool D a limit of 3 running jobs
+    out.println("<pool name=\"poolD\">");
+    out.println("<maxRunningJobs>3</maxRunningJobs>");
+    out.println("</pool>");
+    // Set default limit of jobs per user to 5
+    out.println("<userMaxJobsDefault>5</userMaxJobsDefault>");
+    // Give user1 a limit of 10 jobs
+    out.println("<user name=\"user1\">");
+    out.println("<maxRunningJobs>10</maxRunningJobs>");
+    out.println("</user>");
+    out.println("</allocations>"); 
+    out.close();
+    
+    PoolManager poolManager = scheduler.getPoolManager();
+    poolManager.reloadAllocs();
+    
+    assertEquals(5, poolManager.getPools().size()); // 4 in file + default pool
+    assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
+        TaskType.MAP));
+    assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
+        TaskType.REDUCE));
+    assertEquals(1, poolManager.getAllocation("poolA", TaskType.MAP));
+    assertEquals(2, poolManager.getAllocation("poolA", TaskType.REDUCE));
+    assertEquals(2, poolManager.getAllocation("poolB", TaskType.MAP));
+    assertEquals(1, poolManager.getAllocation("poolB", TaskType.REDUCE));
+    assertEquals(2, poolManager.getAllocation("poolC", TaskType.MAP));
+    assertEquals(0, poolManager.getAllocation("poolC", TaskType.REDUCE));
+    assertEquals(0, poolManager.getAllocation("poolD", TaskType.MAP));
+    assertEquals(0, poolManager.getAllocation("poolD", TaskType.REDUCE));
+    assertEquals(Integer.MAX_VALUE, poolManager.getPoolMaxJobs("poolA"));
+    assertEquals(3, poolManager.getPoolMaxJobs("poolD"));
+    assertEquals(10, poolManager.getUserMaxJobs("user1"));
+    assertEquals(5, poolManager.getUserMaxJobs("user2"));
+  }
+  
+  public void testTaskNotAssignedWhenNoJobsArePresent() throws IOException {
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+
+  public void testNonRunningJobsAreIgnored() throws IOException {
+    submitJobs(1, JobStatus.PREP, 10, 10);
+    submitJobs(1, JobStatus.SUCCEEDED, 10, 10);
+    submitJobs(1, JobStatus.FAILED, 10, 10);
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    advanceTime(100); // Check that we still don't assign jobs after an update
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+
+  /**
+   * This test contains two jobs with fewer required tasks than there are slots.
+   * We check that all tasks are assigned, but job 1 gets them first because it
+   * was submitted earlier.
+   */
+  public void testSmallJobs() throws IOException {
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(2,    info1.neededMaps);
+    assertEquals(1,    info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(4.0,  info1.mapFairShare);
+    assertEquals(4.0,  info1.reduceFairShare);
+    
+    // Advance time before submitting another job j2, to make j1 run before j2
+    // deterministically.
+    advanceTime(100);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check scheduler variables; the fair shares should now have been allocated
+    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(2,    info1.neededMaps);
+    assertEquals(1,    info1.neededReduces);
+    assertEquals(400,  info1.mapDeficit);
+    assertEquals(400,  info1.reduceDeficit);
+    assertEquals(2.0,  info1.mapFairShare);
+    assertEquals(2.0,  info1.reduceFairShare);
+    assertEquals(0,    info2.runningMaps);
+    assertEquals(0,    info2.runningReduces);
+    assertEquals(1,    info2.neededMaps);
+    assertEquals(2,    info2.neededReduces);
+    assertEquals(0,    info2.mapDeficit);
+    assertEquals(0,    info2.reduceDeficit);
+    assertEquals(2.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    
+    // Assign tasks and check that all slots are filled with j1, then j2
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000006_0 on tt2");
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    
+    // Check that the scheduler has started counting the tasks as running
+    // as soon as it launched them.
+    assertEquals(2,  info1.runningMaps);
+    assertEquals(1,  info1.runningReduces);
+    assertEquals(0,  info1.neededMaps);
+    assertEquals(0,  info1.neededReduces);
+    assertEquals(1,  info2.runningMaps);
+    assertEquals(2,  info2.runningReduces);
+    assertEquals(0, info2.neededMaps);
+    assertEquals(0, info2.neededReduces);
+  }
+  
+  /**
+   * This test begins by submitting two jobs with 10 maps and reduces each.
+   * The first job is submitted 100ms after the second, during which time no
+   * tasks run. After this, we assign tasks to all slots, which should all be
+   * from job 1. These run for 200ms, at which point job 2 now has a deficit
+   * of 400 while job 1 is down to a deficit of 0. We then finish all tasks and
+   * assign new ones, which should all be from job 2. These run for 50 ms,
+   * which is not enough time for job 2 to make up its deficit (it only makes up
+   * 100 ms of deficit). Finally we assign a new round of tasks, which should
+   * all be from job 2 again.
+   */
+  public void testLargeJobs() throws IOException {
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(4.0,  info1.mapFairShare);
+    assertEquals(4.0,  info1.reduceFairShare);
+    
+    // Advance time before submitting another job j2, to make j1 run before j2
+    // deterministically.
+    advanceTime(100);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check scheduler variables; the fair shares should now have been allocated
+    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(400,  info1.mapDeficit);
+    assertEquals(400,  info1.reduceDeficit);
+    assertEquals(2.0,  info1.mapFairShare);
+    assertEquals(2.0,  info1.reduceFairShare);
+    assertEquals(0,    info2.runningMaps);
+    assertEquals(0,    info2.runningReduces);
+    assertEquals(10,   info2.neededMaps);
+    assertEquals(10,   info2.neededReduces);
+    assertEquals(0,    info2.mapDeficit);
+    assertEquals(0,    info2.reduceDeficit);
+    assertEquals(2.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    
+    // Assign tasks and check that all slots are initially filled with job 1
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+    
+    // Check that the scheduler has started counting the tasks as running
+    // as soon as it launched them.
+    assertEquals(4,  info1.runningMaps);
+    assertEquals(4,  info1.runningReduces);
+    assertEquals(6,  info1.neededMaps);
+    assertEquals(6,  info1.neededReduces);
+    assertEquals(0,  info2.runningMaps);
+    assertEquals(0,  info2.runningReduces);
+    assertEquals(10, info2.neededMaps);
+    assertEquals(10, info2.neededReduces);
+    
+    // Finish up the tasks and advance time again. Note that we must finish
+    // the task since FakeJobInProgress does not properly maintain running
+    // tasks, so the scheduler will always get an empty task list from
+    // the JobInProgress's getMapTasks/getReduceTasks and think they finished.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000003_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000004_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000005_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000006_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000007_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000008_0");
+    advanceTime(200);
+    assertEquals(0,   info1.runningMaps);
+    assertEquals(0,   info1.runningReduces);
+    assertEquals(0,   info1.mapDeficit);
+    assertEquals(0,   info1.reduceDeficit);
+    assertEquals(0,   info2.runningMaps);
+    assertEquals(0,   info2.runningReduces);
+    assertEquals(400, info2.mapDeficit);
+    assertEquals(400, info2.reduceDeficit);
+
+    // Assign tasks and check that all slots are now filled with job 2
+    checkAssignment("tt1", "attempt_test_0002_m_000009_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000010_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000011_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000012_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000013_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000014_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000015_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000016_0 on tt2");
+
+    // Finish up the tasks and advance time again, but give job 2 only 50ms.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000009_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000010_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000011_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000012_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000013_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000014_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000015_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000016_0");
+    advanceTime(50);
+    assertEquals(0,   info1.runningMaps);
+    assertEquals(0,   info1.runningReduces);
+    assertEquals(100, info1.mapDeficit);
+    assertEquals(100, info1.reduceDeficit);
+    assertEquals(0,   info2.runningMaps);
+    assertEquals(0,   info2.runningReduces);
+    assertEquals(300, info2.mapDeficit);
+    assertEquals(300, info2.reduceDeficit);
+
+    // Assign tasks and check that all slots are now still with job 2
+    checkAssignment("tt1", "attempt_test_0002_m_000017_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000018_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000019_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000020_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000021_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000022_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000023_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000024_0 on tt2");
+  }
+  
+
+  /**
+   * We submit two jobs such that one has 2x the priority of the other, wait
+   * for 100 ms, and check that the weights/deficits are okay and that the
+   * tasks all go to the high-priority job.
+   */
+  public void testJobsWithPriorities() throws IOException {
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info2 = scheduler.infos.get(job2);
+    job2.setPriority(JobPriority.HIGH);
+    scheduler.update();
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(1.33, info1.mapFairShare, 0.1);
+    assertEquals(1.33, info1.reduceFairShare, 0.1);
+    assertEquals(0,    info2.runningMaps);
+    assertEquals(0,    info2.runningReduces);
+    assertEquals(10,   info2.neededMaps);
+    assertEquals(10,   info2.neededReduces);
+    assertEquals(0,    info2.mapDeficit);
+    assertEquals(0,    info2.reduceDeficit);
+    assertEquals(2.66, info2.mapFairShare, 0.1);
+    assertEquals(2.66, info2.reduceFairShare, 0.1);
+    
+    // Advance time and check deficits
+    advanceTime(100);
+    assertEquals(133,  info1.mapDeficit, 1.0);
+    assertEquals(133,  info1.reduceDeficit, 1.0);
+    assertEquals(266,  info2.mapDeficit, 1.0);
+    assertEquals(266,  info2.reduceDeficit, 1.0);
+    
+    // Assign tasks and check that all slots are filled with j1, then j2
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
+  }
+  
+  /**
+   * This test starts by submitting three large jobs:
+   * - job1 in the default pool, at time 0
+   * - job2 in poolA, with an allocation of 1 map / 2 reduces, at time 200
+   * - job3 in poolB, with an allocation of 2 maps / 1 reduce, at time 200
+   * 
+   * After this, we sleep 100ms, until time 300. At this point, job1 has the
+   * highest map deficit, job3 the second, and job2 the third. This is because
+   * job3 has more maps in its min share than job2, but job1 has been around
+   * a long time at the beginning. The reduce deficits are similar, except job2
+   * comes before job3 because it had a higher reduce minimum share.
+   * 
+   * Finally, assign tasks to all slots. The maps should be assigned in the
+   * order job3, job2, job1 because 3 and 2 both have guaranteed slots and 3
+   * has a higher deficit. The reduces should be assigned as job2, job3, job1.
+   */
+  public void testLargeJobsWithPools() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a minimum of 1 map, 2 reduces
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>1</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("</pool>");
+    // Give pool B a minimum of 2 maps, 1 reduce
+    out.println("<pool name=\"poolB\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>1</minReduces>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(4.0,  info1.mapFairShare);
+    assertEquals(4.0,  info1.reduceFairShare);
+    
+    // Advance time 200ms and submit jobs 2 and 3
+    advanceTime(200);
+    assertEquals(800,  info1.mapDeficit);
+    assertEquals(800,  info1.reduceDeficit);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    JobInfo info2 = scheduler.infos.get(job2);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
+    JobInfo info3 = scheduler.infos.get(job3);
+    
+    // Check that minimum and fair shares have been allocated
+    assertEquals(0,    info1.minMaps);
+    assertEquals(0,    info1.minReduces);
+    assertEquals(1.0,  info1.mapFairShare);
+    assertEquals(1.0,  info1.reduceFairShare);
+    assertEquals(1,    info2.minMaps);
+    assertEquals(2,    info2.minReduces);
+    assertEquals(1.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    assertEquals(2,    info3.minMaps);
+    assertEquals(1,    info3.minReduces);
+    assertEquals(2.0,  info3.mapFairShare);
+    assertEquals(1.0,  info3.reduceFairShare);
+    
+    // Advance time 100ms and check deficits
+    advanceTime(100);
+    assertEquals(900,  info1.mapDeficit);
+    assertEquals(900,  info1.reduceDeficit);
+    assertEquals(100,  info2.mapDeficit);
+    assertEquals(200,  info2.reduceDeficit);
+    assertEquals(200,  info3.mapDeficit);
+    assertEquals(100,  info3.reduceDeficit);
+    
+    // Assign tasks and check that slots are first given to needy jobs
+    checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+  }
+
+  /**
+   * This test starts by submitting three large jobs:
+   * - job1 in the default pool, at time 0
+   * - job2 in poolA, with an allocation of 2 maps / 2 reduces, at time 200
+   * - job3 in poolA, with an allocation of 2 maps / 2 reduces, at time 300
+   * 
+   * After this, we sleep 100ms, until time 400. At this point, job1 has the
+   * highest deficit, job2 the second, and job3 the third. The first two tasks
+   * should be assigned to job2 and job3 since they are in a pool with an
+   * allocation guarantee, but the next two slots should be assigned to job 3
+   * because the pool will no longer be needy.
+   */
+  public void testLargeJobsWithExcessCapacity() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a minimum of 2 maps, 2 reduces
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(4.0,  info1.mapFairShare);
+    assertEquals(4.0,  info1.reduceFairShare);
+    
+    // Advance time 200ms and submit job 2
+    advanceTime(200);
+    assertEquals(800,  info1.mapDeficit);
+    assertEquals(800,  info1.reduceDeficit);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check that minimum and fair shares have been allocated
+    assertEquals(0,    info1.minMaps);
+    assertEquals(0,    info1.minReduces);
+    assertEquals(2.0,  info1.mapFairShare);
+    assertEquals(2.0,  info1.reduceFairShare);
+    assertEquals(2,    info2.minMaps);
+    assertEquals(2,    info2.minReduces);
+    assertEquals(2.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    
+    // Advance time 100ms and submit job 3
+    advanceTime(100);
+    assertEquals(1000, info1.mapDeficit);
+    assertEquals(1000, info1.reduceDeficit);
+    assertEquals(200,  info2.mapDeficit);
+    assertEquals(200,  info2.reduceDeficit);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    JobInfo info3 = scheduler.infos.get(job3);
+    
+    // Check that minimum and fair shares have been allocated
+    assertEquals(0,    info1.minMaps);
+    assertEquals(0,    info1.minReduces);
+    assertEquals(1.33, info1.mapFairShare, 0.1);
+    assertEquals(1.33, info1.reduceFairShare, 0.1);
+    assertEquals(1,    info2.minMaps);
+    assertEquals(1,    info2.minReduces);
+    assertEquals(1.33, info2.mapFairShare, 0.1);
+    assertEquals(1.33, info2.reduceFairShare, 0.1);
+    assertEquals(1,    info3.minMaps);
+    assertEquals(1,    info3.minReduces);
+    assertEquals(1.33, info3.mapFairShare, 0.1);
+    assertEquals(1.33, info3.reduceFairShare, 0.1);
+    
+    // Advance time 100ms and check deficits
+    advanceTime(100);
+    assertEquals(1133, info1.mapDeficit, 1.0);
+    assertEquals(1133, info1.reduceDeficit, 1.0);
+    assertEquals(333,  info2.mapDeficit, 1.0);
+    assertEquals(333,  info2.reduceDeficit, 1.0);
+    assertEquals(133,  info3.mapDeficit, 1.0);
+    assertEquals(133,  info3.reduceDeficit, 1.0);
+    
+    // Assign tasks and check that slots are first given to needy jobs, but
+    // that job 1 gets two tasks after due to having a larger deficit.
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+  }
+  
+  /**
+   * This test starts by submitting two jobs at time 0:
+   * - job1 in the default pool
+   * - job2, with 1 map and 1 reduce, in poolA, which has an alloc of 4
+   *   maps and 4 reduces
+   * 
+   * When we assign the slots, job2 should only get 1 of each type of task.
+   * 
+   * The fair share for job 2 should be 2.0 however, because even though it is
+   * running only one task, it accumulates deficit in case it will have failures
+   * or need speculative tasks later. (TODO: This may not be a good policy.)
+   */
+  public void testSmallJobInLargePool() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a minimum of 4 maps, 4 reduces
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>4</minMaps>");
+    out.println("<minReduces>4</minReduces>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 1, "poolA");
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(2.0,  info1.mapFairShare);
+    assertEquals(2.0,  info1.reduceFairShare);
+    assertEquals(0,    info2.runningMaps);
+    assertEquals(0,    info2.runningReduces);
+    assertEquals(1,    info2.neededMaps);
+    assertEquals(1,    info2.neededReduces);
+    assertEquals(0,    info2.mapDeficit);
+    assertEquals(0,    info2.reduceDeficit);
+    assertEquals(2.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    
+    // Assign tasks and check that slots are first given to needy jobs
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+  }
+  
+  /**
+   * This test starts by submitting four jobs in the default pool. However, the
+   * maxRunningJobs limit for this pool has been set to two. We should see only
+   * the first two jobs get scheduled, each with half the total slots.
+   */
+  public void testPoolMaxJobs() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<pool name=\"default\">");
+    out.println("<maxRunningJobs>2</maxRunningJobs>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    // Submit jobs, advancing time in-between to make sure that they are
+    // all submitted at distinct times.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    advanceTime(10);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info2 = scheduler.infos.get(job2);
+    advanceTime(10);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info3 = scheduler.infos.get(job3);
+    advanceTime(10);
+    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info4 = scheduler.infos.get(job4);
+    
+    // Check scheduler variables
+    assertEquals(2.0,  info1.mapFairShare);
+    assertEquals(2.0,  info1.reduceFairShare);
+    assertEquals(2.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    assertEquals(0.0,  info3.mapFairShare);
+    assertEquals(0.0,  info3.reduceFairShare);
+    assertEquals(0.0,  info4.mapFairShare);
+    assertEquals(0.0,  info4.reduceFairShare);
+    
+    // Assign tasks and check that slots are first to jobs 1 and 2
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
+  }
+
+  /**
+   * This test starts by submitting two jobs by user "user1" to the default
+   * pool, and two jobs by "user2". We set user1's job limit to 1. We should
+   * see one job from user1 and two from user2. 
+   */
+  public void testUserMaxJobs() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<user name=\"user1\">");
+    out.println("<maxRunningJobs>1</maxRunningJobs>");
+    out.println("</user>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    // Submit jobs, advancing time in-between to make sure that they are
+    // all submitted at distinct times.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    job1.getJobConf().set("user.name", "user1");
+    JobInfo info1 = scheduler.infos.get(job1);
+    advanceTime(10);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    job2.getJobConf().set("user.name", "user1");
+    JobInfo info2 = scheduler.infos.get(job2);
+    advanceTime(10);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+    job3.getJobConf().set("user.name", "user2");
+    JobInfo info3 = scheduler.infos.get(job3);
+    advanceTime(10);
+    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
+    job4.getJobConf().set("user.name", "user2");
+    JobInfo info4 = scheduler.infos.get(job4);
+    
+    // Check scheduler variables
+    assertEquals(1.33,  info1.mapFairShare, 0.1);
+    assertEquals(1.33,  info1.reduceFairShare, 0.1);
+    assertEquals(0.0,   info2.mapFairShare);
+    assertEquals(0.0,   info2.reduceFairShare);
+    assertEquals(1.33,  info3.mapFairShare, 0.1);
+    assertEquals(1.33,  info3.reduceFairShare, 0.1);
+    assertEquals(1.33,  info4.mapFairShare, 0.1);
+    assertEquals(1.33,  info4.reduceFairShare, 0.1);
+    
+    // Assign tasks and check that slots are first to jobs 1 and 3
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0003_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_r_000008_0 on tt2");
+  }
+  
+  /**
+   * Test a combination of pool job limits and user job limits, the latter
+   * specified through both the userMaxJobsDefaults (for some users) and
+   * user-specific &lt;user&gt; elements in the allocations file. 
+   */
+  public void testComplexJobLimits() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<pool name=\"poolA\">");
+    out.println("<maxRunningJobs>1</maxRunningJobs>");
+    out.println("</pool>");
+    out.println("<user name=\"user1\">");
+    out.println("<maxRunningJobs>1</maxRunningJobs>");
+    out.println("</user>");
+    out.println("<user name=\"user2\">");
+    out.println("<maxRunningJobs>10</maxRunningJobs>");
+    out.println("</user>");
+    out.println("<userMaxJobsDefault>2</userMaxJobsDefault>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    // Submit jobs, advancing time in-between to make sure that they are
+    // all submitted at distinct times.
+    
+    // Two jobs for user1; only one should get to run
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    job1.getJobConf().set("user.name", "user1");
+    JobInfo info1 = scheduler.infos.get(job1);
+    advanceTime(10);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    job2.getJobConf().set("user.name", "user1");
+    JobInfo info2 = scheduler.infos.get(job2);
+    advanceTime(10);
+    
+    // Three jobs for user2; all should get to run
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+    job3.getJobConf().set("user.name", "user2");
+    JobInfo info3 = scheduler.infos.get(job3);
+    advanceTime(10);
+    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
+    job4.getJobConf().set("user.name", "user2");
+    JobInfo info4 = scheduler.infos.get(job4);
+    advanceTime(10);
+    JobInProgress job5 = submitJob(JobStatus.RUNNING, 10, 10);
+    job5.getJobConf().set("user.name", "user2");
+    JobInfo info5 = scheduler.infos.get(job5);
+    advanceTime(10);
+    
+    // Three jobs for user3; only two should get to run
+    JobInProgress job6 = submitJob(JobStatus.RUNNING, 10, 10);
+    job6.getJobConf().set("user.name", "user3");
+    JobInfo info6 = scheduler.infos.get(job6);
+    advanceTime(10);
+    JobInProgress job7 = submitJob(JobStatus.RUNNING, 10, 10);
+    job7.getJobConf().set("user.name", "user3");
+    JobInfo info7 = scheduler.infos.get(job7);
+    advanceTime(10);
+    JobInProgress job8 = submitJob(JobStatus.RUNNING, 10, 10);
+    job8.getJobConf().set("user.name", "user3");
+    JobInfo info8 = scheduler.infos.get(job8);
+    advanceTime(10);
+    
+    // Two jobs for user4, in poolA; only one should get to run
+    JobInProgress job9 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    job9.getJobConf().set("user.name", "user4");
+    JobInfo info9 = scheduler.infos.get(job9);
+    advanceTime(10);
+    JobInProgress job10 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    job10.getJobConf().set("user.name", "user4");
+    JobInfo info10 = scheduler.infos.get(job10);
+    advanceTime(10);
+    
+    // Check scheduler variables
+    double SHARE = 4.0 / 7.0; // We have 4 slots and 7 runnable jobs
+    assertEquals(SHARE,  info1.mapFairShare, 0.1);
+    assertEquals(SHARE,  info1.reduceFairShare, 0.1);
+    assertEquals(0.0,    info2.mapFairShare);
+    assertEquals(0.0,    info2.reduceFairShare);
+    assertEquals(SHARE,  info3.mapFairShare, 0.1);
+    assertEquals(SHARE,  info3.reduceFairShare, 0.1);
+    assertEquals(SHARE,  info4.mapFairShare, 0.1);
+    assertEquals(SHARE,  info4.reduceFairShare, 0.1);
+    assertEquals(SHARE,  info5.mapFairShare, 0.1);
+    assertEquals(SHARE,  info5.reduceFairShare, 0.1);
+    assertEquals(SHARE,  info6.mapFairShare, 0.1);
+    assertEquals(SHARE,  info6.reduceFairShare, 0.1);
+    assertEquals(SHARE,  info7.mapFairShare, 0.1);
+    assertEquals(SHARE,  info7.reduceFairShare, 0.1);
+    assertEquals(0.0,    info8.mapFairShare);
+    assertEquals(0.0,    info8.reduceFairShare);
+    assertEquals(SHARE,  info9.mapFairShare, 0.1);
+    assertEquals(SHARE,  info9.reduceFairShare, 0.1);
+    assertEquals(0.0,    info10.mapFairShare);
+    assertEquals(0.0,    info10.reduceFairShare);
+  }
+  
+  public void testSizeBasedWeight() throws Exception {
+    scheduler.sizeBasedWeight = true;
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 10);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 20, 1);
+    assertTrue(scheduler.infos.get(job2).mapFairShare >
+               scheduler.infos.get(job1).mapFairShare);
+    assertTrue(scheduler.infos.get(job1).reduceFairShare >
+               scheduler.infos.get(job2).reduceFairShare);
+  }
+  
+  private void advanceTime(long time) {
+    clock.advance(time);
+    scheduler.update();
+  }
+
+  protected TaskTrackerStatus tracker(String taskTrackerName) {
+    return taskTrackerManager.getTaskTracker(taskTrackerName);
+  }
+  
+  protected void checkAssignment(String taskTrackerName,
+      String expectedTaskString) throws IOException {
+    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    assertNotNull(expectedTaskString, tasks);
+    assertEquals(expectedTaskString, 1, tasks.size());
+    assertEquals(expectedTaskString, tasks.get(0).toString());
+  }
+  
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=690093&r1=690092&r2=690093&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Thu Aug 28 20:50:06 2008
@@ -832,4 +832,7 @@
     rawSplit.clearBytes();
   }
 
+  TreeMap<TaskAttemptID, String> getActiveTasks() {
+    return activeTasks;
+  }
 }



Mime
View raw message