hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r678193 - in /hadoop/core/trunk: conf/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Sat, 19 Jul 2008 17:50:53 GMT
Author: omalley
Date: Sat Jul 19 10:50:52 2008
New Revision: 678193

URL: http://svn.apache.org/viewvc?rev=678193&view=rev
Log:
HADOOP-3412.  Factor the scheduler out of the JobTracker and make it pluggable.
(Tom White and Brice Arnould via omalley)

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLimitTasksPerJobTaskScheduler.java
Modified:
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=678193&r1=678192&r2=678193&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Sat Jul 19 10:50:52 2008
@@ -702,6 +702,20 @@
 </property>
 
 <property>
+  <name>mapred.jobtracker.taskScheduler</name>
+  <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
+  <description>The class responsible for scheduling the tasks.</description>
+</property>
+
+<property>
+  <name>mapred.jobtracker.taskScheduler.maxRunningTasksPerJob</name>
+  <value></value>
+  <description>The maximum number of running tasks for a job before
+  it gets preempted. No limits if undefined.
+  </description>
+</property>
+
+<property>
   <name>mapred.map.max.attempts</name>
   <value>4</value>
   <description>Expert: The maximum number of attempts per map task.

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=678193&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Sat Jul 19 10:50:52 2008
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A {@link JobInProgressListener} which initializes the tasks for a job as soon
+ * as the job is added (using the {@link #jobAdded(JobInProgress)} method).
+ */
+class EagerTaskInitializationListener extends JobInProgressListener {
+  
+  private static final Log LOG = LogFactory.getLog(
+      EagerTaskInitializationListener.class.getName());
+  
+  /////////////////////////////////////////////////////////////////
+  //  Used to init new jobs that have just been created
+  /////////////////////////////////////////////////////////////////
+  class JobInitThread implements Runnable {
+    public void run() {
+      JobInProgress job;
+      while (true) {
+        job = null;
+        try {
+          synchronized (jobInitQueue) {
+            while (jobInitQueue.isEmpty()) {
+              jobInitQueue.wait();
+            }
+            job = jobInitQueue.remove(0);
+          }
+          job.initTasks();
+        } catch (InterruptedException t) {
+          break;
+        } catch (Throwable t) {
+          LOG.error("Job initialization failed:\n" +
+                    StringUtils.stringifyException(t));
+          if (job != null) {
+            job.kill();
+          }
+        }
+      }
+    }
+  }
+  
+  private JobInitThread initJobs = new JobInitThread();
+  private Thread initJobsThread;
+  private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
+  
+  public void start() throws IOException {
+    this.initJobsThread = new Thread(initJobs, "initJobs");
+    this.initJobsThread.start();
+  }
+  
+  public void terminate() throws IOException {
+    if (this.initJobsThread != null && this.initJobsThread.isAlive()) {
+      LOG.info("Stopping initer");
+      this.initJobsThread.interrupt();
+      try {
+        this.initJobsThread.join();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * We add the JIP to the jobInitQueue, which is processed 
+   * asynchronously to handle split-computation and build up
+   * the right TaskTracker/Block mapping.
+   */
+  @Override
+  public void jobAdded(JobInProgress job) {
+    synchronized (jobInitQueue) {
+      jobInitQueue.add(job);
+      resortInitQueue();
+      jobInitQueue.notifyAll();
+    }
+
+  }
+  
+  /**
+   * Sort jobs by priority and then by start time.
+   */
+  private synchronized void resortInitQueue() {
+    Comparator<JobInProgress> comp = new Comparator<JobInProgress>() {
+      public int compare(JobInProgress o1, JobInProgress o2) {
+        int res = o1.getPriority().compareTo(o2.getPriority());
+        if(res == 0) {
+          if(o1.getStartTime() < o2.getStartTime())
+            res = -1;
+          else
+            res = (o1.getStartTime()==o2.getStartTime() ? 0 : 1);
+        }
+          
+        return res;
+      }
+    };
+    
+    synchronized (jobInitQueue) {
+      Collections.sort(jobInitQueue, comp);
+    }
+  }
+
+  @Override
+  public void jobRemoved(JobInProgress job) {
+    synchronized (jobInitQueue) {
+      jobInitQueue.remove(job);
+    }
+  }
+
+  @Override
+  public void jobUpdated(JobInProgress job) {
+    synchronized (jobInitQueue) {
+      resortInitQueue();
+    }
+  }
+
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=678193&r1=678192&r2=678193&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Sat Jul 19 10:50:52 2008
@@ -154,6 +154,16 @@
     new TreeMap<TaskAttemptID, Integer>();
   
   /**
+   * Create an almost empty JobInProgress, which can be used only for tests
+   */
+  protected JobInProgress(JobID jobid, JobConf conf) {
+    this.conf = conf;
+    this.jobId = jobid;
+    this.numMapTasks = conf.getNumMapTasks();
+    this.numReduceTasks = conf.getNumReduceTasks();
+  }
+  
+  /**
    * Create a JobInProgress with the given job file, plus a handle
    * to the tracker.
    */
@@ -1775,4 +1785,11 @@
       return metrics;
     }
   }
+  
+  /**
+   * @return The JobID of this JobInProgress.
+   */
+  public JobID getJobID() {
+    return jobId;
+  }
 }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java?rev=678193&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java Sat Jul 19 10:50:52 2008
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ * A listener for changes in a {@link JobInProgress job}'s lifecycle in the
+ * {@link JobTracker}.
+ */
+abstract class JobInProgressListener {
+
+  /**
+   * Invoked when a new job has been added to the {@link JobTracker}.
+   * @param job The added job.
+   */
+  public abstract void jobAdded(JobInProgress job);
+
+  /**
+   * Invoked when a job has been removed from the {@link JobTracker}.
+   * @param job The removed job.
+   */
+  public abstract void jobRemoved(JobInProgress job);
+  
+  /**
+   * Invoked when a job has been updated in the {@link JobTracker}.
+   * @param job The updated job.
+   */
+  public abstract void jobUpdated(JobInProgress job);
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java?rev=678193&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java Sat Jul 19 10:50:52 2008
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.TreeSet;
+
+/**
+ * A {@link JobInProgressListener} that maintains the jobs being managed in
+ * a queue. By default the queue is FIFO, but it is possible to use custom
+ * queue ordering by using the
+ * {@link #JobQueueJobInProgressListener(Collection)} constructor.
+ */
+class JobQueueJobInProgressListener extends JobInProgressListener {
+
+  private static final Comparator<JobInProgress> FIFO_JOB_QUEUE_COMPARATOR
+    = new Comparator<JobInProgress>() {
+    public int compare(JobInProgress o1, JobInProgress o2) {
+      int res = o1.getPriority().compareTo(o2.getPriority());
+      if (res == 0) {
+        if (o1.getStartTime() < o2.getStartTime()) {
+          res = -1;
+        } else {
+          res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
+        }
+      }
+      if (res == 0) {
+        res = o1.getJobID().compareTo(o2.getJobID());
+      }
+      return res;
+    }
+  };
+  
+  private Collection<JobInProgress> jobQueue;
+  
+  public JobQueueJobInProgressListener() {
+    this(new TreeSet<JobInProgress>(FIFO_JOB_QUEUE_COMPARATOR));
+  }
+
+  /**
+   * For clients that want to provide their own job priorities.
+   * @param jobQueue A collection whose iterator returns jobs in priority order.
+   */
+  protected JobQueueJobInProgressListener(Collection<JobInProgress> jobQueue) {
+    this.jobQueue = Collections.synchronizedCollection(jobQueue);
+  }
+
+  /**
+   * Returns a synchronized view of the the job queue.
+   */
+  public Collection<JobInProgress> getJobQueue() {
+    return jobQueue;
+  }
+  
+  @Override
+  public void jobAdded(JobInProgress job) {
+    jobQueue.add(job);
+  }
+
+  @Override
+  public void jobRemoved(JobInProgress job) {
+    jobQueue.remove(job);
+  }
+  
+  @Override
+  public synchronized void jobUpdated(JobInProgress job) {
+    synchronized (jobQueue) {
+      jobQueue.remove(job);
+      jobQueue.add(job);
+    }
+  }
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=678193&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Sat Jul 19 10:50:52 2008
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A {@link TaskScheduler} that keeps jobs in a queue in priority order (FIFO
+ * by default).
+ */
+class JobQueueTaskScheduler extends TaskScheduler {
+  
+  private static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
+  
+  protected JobQueueJobInProgressListener jobQueueJobInProgressListener;
+  private EagerTaskInitializationListener eagerTaskInitializationListener;
+  private float padFraction;
+  
+  public JobQueueTaskScheduler() {
+    this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
+    this.eagerTaskInitializationListener =
+      new EagerTaskInitializationListener();
+  }
+  
+  @Override
+  public synchronized void start() throws IOException {
+    super.start();
+    taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
+    
+    eagerTaskInitializationListener.start();
+    taskTrackerManager.addJobInProgressListener(
+        eagerTaskInitializationListener);
+  }
+  
+  @Override
+  public synchronized void terminate() throws IOException {
+    if (jobQueueJobInProgressListener != null) {
+      taskTrackerManager.removeJobInProgressListener(
+          jobQueueJobInProgressListener);
+    }
+    if (eagerTaskInitializationListener != null) {
+      taskTrackerManager.removeJobInProgressListener(
+          eagerTaskInitializationListener);
+      eagerTaskInitializationListener.terminate();
+    }
+    super.terminate();
+  }
+  
+  @Override
+  public synchronized void setConf(Configuration conf) {
+    super.setConf(conf);
+    padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 
+                                 0.01f);
+  }
+
+  @Override
+  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+      throws IOException {
+    //
+    // Compute average map and reduce task numbers across pool
+    //
+    int remainingReduceLoad = 0;
+    int remainingMapLoad = 0;
+
+    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+    int numTaskTrackers = clusterStatus.getTaskTrackers();
+
+    Collection<JobInProgress> jobQueue =
+      jobQueueJobInProgressListener.getJobQueue();
+    synchronized (jobQueue) {
+      for (JobInProgress job : jobQueue) {
+        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+          int totalMapTasks = job.desiredMaps();
+          int totalReduceTasks = job.desiredReduces();
+          remainingMapLoad += (totalMapTasks - job.finishedMaps());
+          remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
+        }
+      }
+    }
+
+    int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
+    int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
+    // find out the maximum number of maps or reduces that we are willing
+    // to run on any node.
+    int maxMapLoad = 0;
+    int maxReduceLoad = 0;
+    if (numTaskTrackers > 0) {
+      maxMapLoad = Math.min(maxCurrentMapTasks,
+                            (int) Math.ceil((double) remainingMapLoad / 
+                                            numTaskTrackers));
+      maxReduceLoad = Math.min(maxCurrentReduceTasks,
+                               (int) Math.ceil((double) remainingReduceLoad
+                                               / numTaskTrackers));
+    }
+        
+    //
+    // Get map + reduce counts for the current tracker.
+    //
+
+    int numMaps = taskTracker.countMapTasks();
+    int numReduces = taskTracker.countReduceTasks();
+    
+    int totalMaps = clusterStatus.getMapTasks();
+    int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();
+    int totalReduces = clusterStatus.getReduceTasks();
+    int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks();
+
+    //
+    // In the below steps, we allocate first a map task (if appropriate),
+    // and then a reduce task if appropriate.  We go through all jobs
+    // in order of job arrival; jobs only get serviced if their 
+    // predecessors are serviced, too.
+    //
+
+    //
+    // We hand a task to the current taskTracker if the given machine 
+    // has a workload that's less than the maximum load of that kind of
+    // task.
+    //
+       
+    if (numMaps < maxMapLoad) {
+
+      int totalNeededMaps = 0;
+      synchronized (jobQueue) {
+        for (JobInProgress job : jobQueue) {
+          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+            continue;
+          }
+
+          Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
+              taskTrackerManager.getNumberOfUniqueHosts());
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+
+          //
+          // Beyond the highest-priority task, reserve a little 
+          // room for failures and speculative executions; don't 
+          // schedule tasks to the hilt.
+          //
+          totalNeededMaps += job.desiredMaps();
+          int padding = 0;
+          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
+            padding = Math.min(maxCurrentMapTasks,
+                               (int)(totalNeededMaps * padFraction));
+          }
+          if (totalMaps + padding >= totalMapTaskCapacity) {
+            break;
+          }
+        }
+      }
+    }
+
+    //
+    // Same thing, but for reduce tasks
+    //
+    if (numReduces < maxReduceLoad) {
+
+      int totalNeededReduces = 0;
+      synchronized (jobQueue) {
+        for (JobInProgress job : jobQueue) {
+          if (job.getStatus().getRunState() != JobStatus.RUNNING ||
+              job.numReduceTasks == 0) {
+            continue;
+          }
+
+          Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
+              taskTrackerManager.getNumberOfUniqueHosts());
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+
+          //
+          // Beyond the highest-priority task, reserve a little 
+          // room for failures and speculative executions; don't 
+          // schedule tasks to the hilt.
+          //
+          totalNeededReduces += job.desiredReduces();
+          int padding = 0;
+          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
+            padding = 
+              Math.min(maxCurrentReduceTasks,
+                       (int) (totalNeededReduces * padFraction));
+          }
+          if (totalReduces + padding >= totalReduceTaskCapacity) {
+            break;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=678193&r1=678192&r2=678193&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Sat Jul 19 10:50:52 2008
@@ -39,6 +39,8 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -73,13 +75,12 @@
  * tracking MR jobs in a network environment.
  *
  *******************************************************/
-public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol {
+public class JobTracker implements MRConstants, InterTrackerProtocol,
+    JobSubmissionProtocol, TaskTrackerManager {
+
   static long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
   static long RETIRE_JOB_INTERVAL;
   static long RETIRE_JOB_CHECK_INTERVAL;
-  static float TASK_ALLOC_EPSILON;
-  static float PAD_FRACTION;
-  static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
@@ -89,6 +90,9 @@
   private ResolutionThread resThread = new ResolutionThread();
   private int numTaskCacheLevels; // the max level to which we cache tasks
   private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
+  private final TaskScheduler taskScheduler;
+  private final List<JobInProgressListener> jobInProgressListeners =
+    new CopyOnWriteArrayList<JobInProgressListener>();
 
   // system directories are world-wide readable and owner readable
   final static FsPermission SYSTEM_DIR_PERMISSION =
@@ -130,6 +134,7 @@
     while (true) {
       try {
         result = new JobTracker(conf);
+        result.taskScheduler.setTaskTrackerManager(result);
         break;
       } catch (VersionMismatch e) {
         throw e;
@@ -340,8 +345,8 @@
           List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
           long retireBefore = System.currentTimeMillis() - 
             RETIRE_JOB_INTERVAL;
-          synchronized (jobsByPriority) {
-            for(JobInProgress job: jobsByPriority) {
+          synchronized (jobs) {
+            for(JobInProgress job: jobs.values()) {
               if (job.getStatus().getRunState() != JobStatus.RUNNING &&
                   job.getStatus().getRunState() != JobStatus.PREP &&
                   (job.getFinishTime()  < retireBefore)) {
@@ -352,28 +357,27 @@
           if (!retiredJobs.isEmpty()) {
             synchronized (JobTracker.this) {
               synchronized (jobs) {
-                synchronized (jobsByPriority) {
-                  synchronized (jobInitQueue) {
-                    for (JobInProgress job: retiredJobs) {
-                      removeJobTasks(job);
-                      jobs.remove(job.getProfile().getJobID());
-                      jobInitQueue.remove(job);
-                      jobsByPriority.remove(job);
-                      String jobUser = job.getProfile().getUser();
-                      synchronized (userToJobsMap) {
-                        ArrayList<JobInProgress> userJobs =
-                          userToJobsMap.get(jobUser);
-                        synchronized (userJobs) {
-                          userJobs.remove(job);
-                        }
-                        if (userJobs.isEmpty()) {
-                          userToJobsMap.remove(jobUser);
-                        }
+                synchronized (taskScheduler) {
+                  for (JobInProgress job: retiredJobs) {
+                    removeJobTasks(job);
+                    jobs.remove(job.getProfile().getJobID());
+                    for (JobInProgressListener l : jobInProgressListeners) {
+                      l.jobRemoved(job);
+                    }
+                    String jobUser = job.getProfile().getUser();
+                    synchronized (userToJobsMap) {
+                      ArrayList<JobInProgress> userJobs =
+                        userToJobsMap.get(jobUser);
+                      synchronized (userJobs) {
+                        userJobs.remove(job);
+                      }
+                      if (userJobs.isEmpty()) {
+                        userToJobsMap.remove(jobUser);
                       }
-                      LOG.info("Retired job with id: '" + 
-                               job.getProfile().getJobID() + "' of user '" +
-                               jobUser + "'");
                     }
+                    LOG.info("Retired job with id: '" + 
+                             job.getProfile().getJobID() + "' of user '" +
+                             jobUser + "'");
                   }
                 }
               }
@@ -389,37 +393,6 @@
     }
   }
 
-  /////////////////////////////////////////////////////////////////
-  //  Used to init new jobs that have just been created
-  /////////////////////////////////////////////////////////////////
-  class JobInitThread implements Runnable {
-    public JobInitThread() {
-    }
-    public void run() {
-      JobInProgress job;
-      while (true) {
-        job = null;
-        try {
-          synchronized (jobInitQueue) {
-            while (jobInitQueue.isEmpty()) {
-              jobInitQueue.wait();
-            }
-            job = jobInitQueue.remove(0);
-          }
-          job.initTasks();
-        } catch (InterruptedException t) {
-          break;
-        } catch (Throwable t) {
-          LOG.error("Job initialization failed:\n" +
-                    StringUtils.stringifyException(t));
-          if (job != null) {
-            job.kill();
-          }
-        }
-      }
-    }
-  }
-
   static class JobTrackerMetrics implements Updater {
     private MetricsRecord metricsRecord = null;
     private int numMapTasksLaunched = 0;
@@ -528,7 +501,6 @@
 
   // All the known jobs.  (jobid->JobInProgress)
   Map<JobID, JobInProgress> jobs = new TreeMap<JobID, JobInProgress>();
-  List<JobInProgress> jobsByPriority = new ArrayList<JobInProgress>();
 
   // (user -> list of JobInProgress)
   TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
@@ -568,14 +540,11 @@
   int totalReduces = 0;
   private HashMap<String, TaskTrackerStatus> taskTrackers =
     new HashMap<String, TaskTrackerStatus>();
-  HashMap<String,Integer>uniqueHostsMap = new HashMap<String, Integer>();
-  List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
+  Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
   ExpireTrackers expireTrackers = new ExpireTrackers();
   Thread expireTrackersThread = null;
   RetireJobs retireJobs = new RetireJobs();
   Thread retireJobsThread = null;
-  JobInitThread initJobs = new JobInitThread();
-  Thread initJobsThread = null;
   ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
   Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
                                                 "expireLaunchingTasks");
@@ -633,9 +602,6 @@
       conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
     RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);
     RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
-    TASK_ALLOC_EPSILON = conf.getFloat("mapred.jobtracker.taskalloc.loadbalance.epsilon", 0.2f);
-    PAD_FRACTION = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 
-                                 0.01f);
     MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
 
     // This is a directory of temporary submission files.  We delete it
@@ -646,6 +612,12 @@
     // Read the hosts/exclude files to restrict access to the jobtracker.
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
                                            conf.get("mapred.hosts.exclude", ""));
+    
+    // Create the scheduler
+    Class<? extends TaskScheduler> schedulerClass
+      = conf.getClass("mapred.jobtracker.taskScheduler",
+          JobQueueTaskScheduler.class, TaskScheduler.class);
+    taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
                                            
     // Set ports, start RPC servers, etc.
     InetSocketAddress addr = getAddress(conf);
@@ -765,15 +737,14 @@
   /**
    * Run forever
    */
-  public void offerService() throws InterruptedException {
+  public void offerService() throws InterruptedException, IOException {
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
     this.expireTrackersThread.start();
     this.resThread.start();
     this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
     this.retireJobsThread.start();
-    this.initJobsThread = new Thread(this.initJobs, "initJobs");
-    this.initJobsThread.start();
+    taskScheduler.start();
     expireLaunchingTaskThread.start();
     this.taskCommitThread = new TaskCommitQueue();
     this.taskCommitThread.start();
@@ -819,14 +790,8 @@
         ex.printStackTrace();
       }
     }
-    if (this.initJobsThread != null && this.initJobsThread.isAlive()) {
-      LOG.info("Stopping initer");
-      this.initJobsThread.interrupt();
-      try {
-        this.initJobsThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
+    if (taskScheduler != null) {
+      taskScheduler.terminate();
     }
     if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {
       LOG.info("Stopping expireLaunchingTasks");
@@ -1023,63 +988,62 @@
     // in memory; information about the purged jobs is available via
     // JobHistory.
     synchronized (jobs) {
-      synchronized (jobsByPriority) {
-        synchronized (jobInitQueue) {
-          synchronized (userToJobsMap) {
-            String jobUser = job.getProfile().getUser();
-            if (!userToJobsMap.containsKey(jobUser)) {
-              userToJobsMap.put(jobUser, 
-                                new ArrayList<JobInProgress>());
-            }
-            ArrayList<JobInProgress> userJobs = 
-              userToJobsMap.get(jobUser);
-            synchronized (userJobs) {
-              // Add the currently completed 'job'
-              userJobs.add(job);
-
-              // Check if we need to retire some jobs of this user
-              while (userJobs.size() > 
-                     MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
-                JobInProgress rjob = userJobs.get(0);
+      synchronized (taskScheduler) {
+        synchronized (userToJobsMap) {
+          String jobUser = job.getProfile().getUser();
+          if (!userToJobsMap.containsKey(jobUser)) {
+            userToJobsMap.put(jobUser, 
+                              new ArrayList<JobInProgress>());
+          }
+          ArrayList<JobInProgress> userJobs = 
+            userToJobsMap.get(jobUser);
+          synchronized (userJobs) {
+            // Add the currently completed 'job'
+            userJobs.add(job);
+
+            // Check if we need to retire some jobs of this user
+            while (userJobs.size() > 
+                   MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+              JobInProgress rjob = userJobs.get(0);
+                
+              // Do not delete 'current'
+              // finished job just yet.
+              if (rjob == job) {
+                break;
+              }
+                
+              // Cleanup all datastructures
+              int rjobRunState = 
+                rjob.getStatus().getRunState();
+              if (rjobRunState == JobStatus.SUCCEEDED || 
+                  rjobRunState == JobStatus.FAILED) {
+                // Ok, this call to removeTaskEntries
+                // is dangerous is some very very obscure
+                // cases; e.g. when rjob completed, hit
+                // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
+                // limit and yet some task (taskid)
+                // wasn't complete!
+                removeJobTasks(rjob);
                   
-                // Do not delete 'current'
-                // finished job just yet.
-                if (rjob == job) {
-                  break;
+                userJobs.remove(0);
+                jobs.remove(rjob.getProfile().getJobID());
+                for (JobInProgressListener listener : jobInProgressListeners) {
+                  listener.jobRemoved(rjob);
                 }
                   
-                // Cleanup all datastructures
-                int rjobRunState = 
-                  rjob.getStatus().getRunState();
-                if (rjobRunState == JobStatus.SUCCEEDED || 
-                    rjobRunState == JobStatus.FAILED) {
-                  // Ok, this call to removeTaskEntries
-                  // is dangerous is some very very obscure
-                  // cases; e.g. when rjob completed, hit
-                  // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
-                  // limit and yet some task (taskid)
-                  // wasn't complete!
-                  removeJobTasks(rjob);
-                    
-                  userJobs.remove(0);
-                  jobs.remove(rjob.getProfile().getJobID());
-                  jobInitQueue.remove(rjob);
-                  jobsByPriority.remove(rjob);
-                    
-                  LOG.info("Retired job with id: '" + 
-                           rjob.getProfile().getJobID() + "' of user: '" +
-                           jobUser + "'");
-                } else {
-                  // Do not remove jobs that aren't complete.
-                  // Stop here, and let the next pass take
-                  // care of purging jobs.
-                  break;
-                }
+                LOG.info("Retired job with id: '" + 
+                         rjob.getProfile().getJobID() + "' of user: '" +
+                         jobUser + "'");
+              } else {
+                // Do not remove jobs that aren't complete.
+                // Stop here, and let the next pass take
+                // care of purging jobs.
+                break;
               }
             }
-            if (userJobs.isEmpty()) {
-              userToJobsMap.remove(jobUser);
-            }
+          }
+          if (userJobs.isEmpty()) {
+            userToJobsMap.remove(jobUser);
           }
         }
       }
@@ -1156,7 +1120,7 @@
     }
     return v;
   }
-  public Collection taskTrackers() {
+  public Collection<TaskTrackerStatus> taskTrackers() {
     synchronized (taskTrackers) {
       return taskTrackers.values();
     }
@@ -1226,6 +1190,19 @@
   public int getNumResolvedTaskTrackers() {
     return numResolved;
   }
+  
+  public int getNumberOfUniqueHosts() {
+    return uniqueHostsMap.size();
+  }
+  
+  public void addJobInProgressListener(JobInProgressListener listener) {
+    jobInProgressListeners.add(listener);
+  }
+
+  public void removeJobInProgressListener(JobInProgressListener listener) {
+    jobInProgressListeners.remove(listener);
+  }
+  
   ////////////////////////////////////////////////////
   // InterTrackerProtocol
   ////////////////////////////////////////////////////
@@ -1304,10 +1281,23 @@
       
     // Check for new tasks to be executed on the tasktracker
     if (acceptNewTasks) {
-      Task task = getNewTaskForTaskTracker(trackerName);
-      if (task != null) {
-        LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
-        actions.add(new LaunchTaskAction(task));
+      TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
+      if (taskTrackerStatus == null) {
+        LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
+      } else {
+        List<Task> tasks = taskScheduler.assignTasks(taskTrackerStatus);
+        if (tasks != null) {
+          for (Task task : tasks) {
+            expireLaunchingTasks.addNewTask(task.getTaskID());
+            if (task.isMapTask()) {
+              myMetrics.launchMap();
+            } else {
+              myMetrics.launchReduce();
+            }
+            LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
+            actions.add(new LaunchTaskAction(task));
+          }
+        }
       }
     }
       
@@ -1520,156 +1510,6 @@
       LOG.warn(getName() + " exiting...");
     }
   }
-  
-  /**
-   * Returns a task we'd like the TaskTracker to execute right now.
-   *
-   * Eventually this function should compute load on the various TaskTrackers,
-   * and incorporate knowledge of DFS file placement.  But for right now, it
-   * just grabs a single item out of the pending task list and hands it back.
-   */
-  private synchronized Task getNewTaskForTaskTracker(String taskTracker
-                                                     ) throws IOException {
-    //
-    // Compute average map and reduce task numbers across pool
-    //
-    int remainingReduceLoad = 0;
-    int remainingMapLoad = 0;
-    int numTaskTrackers;
-    TaskTrackerStatus tts;
-
-    synchronized (taskTrackers) {
-      numTaskTrackers = taskTrackers.size();
-      tts = taskTrackers.get(taskTracker);
-    }
-    if (tts == null) {
-      LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker);
-      return null;
-    }
-
-    synchronized(jobsByPriority){
-      for (Iterator it = jobsByPriority.iterator(); it.hasNext();) {
-        JobInProgress job = (JobInProgress) it.next();
-        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
-          int totalMapTasks = job.desiredMaps();
-          int totalReduceTasks = job.desiredReduces();
-          remainingMapLoad += (totalMapTasks - job.finishedMaps());
-          remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
-        }
-      }   
-    }
-
-    int maxCurrentMapTasks = tts.getMaxMapTasks();
-    int maxCurrentReduceTasks = tts.getMaxReduceTasks();
-    // find out the maximum number of maps or reduces that we are willing
-    // to run on any node.
-    int maxMapLoad = 0;
-    int maxReduceLoad = 0;
-    if (numTaskTrackers > 0) {
-      maxMapLoad = Math.min(maxCurrentMapTasks,
-                            (int) Math.ceil((double) remainingMapLoad / 
-                                            numTaskTrackers));
-      maxReduceLoad = Math.min(maxCurrentReduceTasks,
-                               (int) Math.ceil((double) remainingReduceLoad
-                                               / numTaskTrackers));
-    }
-        
-    //
-    // Get map + reduce counts for the current tracker.
-    //
-
-    int numMaps = tts.countMapTasks();
-    int numReduces = tts.countReduceTasks();
-
-    //
-    // In the below steps, we allocate first a map task (if appropriate),
-    // and then a reduce task if appropriate.  We go through all jobs
-    // in order of job arrival; jobs only get serviced if their 
-    // predecessors are serviced, too.
-    //
-
-    //
-    // We hand a task to the current taskTracker if the given machine 
-    // has a workload that's less than the maximum load of that kind of
-    // task.
-    //
-       
-    synchronized (jobsByPriority) {
-      if (numMaps < maxMapLoad) {
-
-        int totalNeededMaps = 0;
-        for (Iterator it = jobsByPriority.iterator(); it.hasNext();) {
-          JobInProgress job = (JobInProgress) it.next();
-          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
-            continue;
-          }
-
-          Task t = job.obtainNewMapTask(tts, numTaskTrackers,
-                                        uniqueHostsMap.size());
-          if (t != null) {
-            expireLaunchingTasks.addNewTask(t.getTaskID());
-            myMetrics.launchMap();
-            return t;
-          }
-
-          //
-          // Beyond the highest-priority task, reserve a little 
-          // room for failures and speculative executions; don't 
-          // schedule tasks to the hilt.
-          //
-          totalNeededMaps += job.desiredMaps();
-          int padding = 0;
-          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
-            padding = Math.min(maxCurrentMapTasks,
-                               (int)(totalNeededMaps * PAD_FRACTION));
-          }
-          if (totalMaps + padding >= totalMapTaskCapacity) {
-            break;
-          }
-        }
-      }
-
-      //
-      // Same thing, but for reduce tasks
-      //
-      if (numReduces < maxReduceLoad) {
-
-        int totalNeededReduces = 0;
-        for (Iterator it = jobsByPriority.iterator(); it.hasNext();) {
-          JobInProgress job = (JobInProgress) it.next();
-          if (job.getStatus().getRunState() != JobStatus.RUNNING ||
-              job.numReduceTasks == 0) {
-            continue;
-          }
-
-          Task t = job.obtainNewReduceTask(tts, numTaskTrackers, 
-                                           uniqueHostsMap.size());
-          if (t != null) {
-            expireLaunchingTasks.addNewTask(t.getTaskID());
-            myMetrics.launchReduce();
-            return t;
-          }
-
-          //
-          // Beyond the highest-priority task, reserve a little 
-          // room for failures and speculative executions; don't 
-          // schedule tasks to the hilt.
-          //
-          totalNeededReduces += job.desiredReduces();
-          int padding = 0;
-          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
-            padding = 
-              Math.min(maxCurrentReduceTasks,
-                       (int) (totalNeededReduces * PAD_FRACTION));
-          }
-          if (totalReduces + padding >= totalReduceTaskCapacity) {
-            break;
-          }
-        }
-      }
-    }
-    return null;
-  }
 
   /**
    * A tracker wants to know if any of its Tasks have been
@@ -1762,10 +1602,6 @@
    * and JobStatus.  Those two sub-objects are sometimes shipped outside
    * of the JobTracker.  But JobInProgress adds info that's useful for
    * the JobTracker alone.
-   *
-   * We add the JIP to the jobInitQueue, which is processed 
-   * asynchronously to handle split-computation and build up
-   * the right TaskTracker/Block mapping.
    */
   public synchronized JobStatus submitJob(JobID jobId) throws IOException {
     ensureRunning();
@@ -1777,13 +1613,10 @@
     totalSubmissions++;
     JobInProgress job = new JobInProgress(jobId, this, this.conf);
     synchronized (jobs) {
-      synchronized (jobsByPriority) {
-        synchronized (jobInitQueue) {
-          jobs.put(job.getProfile().getJobID(), job);
-          jobsByPriority.add(job);
-          jobInitQueue.add(job);
-          resortPriority();
-          jobInitQueue.notifyAll();
+      synchronized (taskScheduler) {
+        jobs.put(job.getProfile().getJobID(), job);
+        for (JobInProgressListener listener : jobInProgressListeners) {
+          listener.jobAdded(job);
         }
       }
     }
@@ -1791,32 +1624,6 @@
     return job.getStatus();
   }
 
-  /**
-   * Sort jobs by priority and then by start time.
-   */
-  private synchronized void resortPriority() {
-    Comparator<JobInProgress> comp = new Comparator<JobInProgress>() {
-      public int compare(JobInProgress o1, JobInProgress o2) {
-        int res = o1.getPriority().compareTo(o2.getPriority());
-        if(res == 0) {
-          if(o1.getStartTime() < o2.getStartTime())
-            res = -1;
-          else
-            res = (o1.getStartTime()==o2.getStartTime() ? 0 : 1);
-        }
-          
-        return res;
-      }
-    };
-    
-    synchronized(jobsByPriority) {
-      Collections.sort(jobsByPriority, comp);
-    }
-    synchronized (jobInitQueue) {
-      Collections.sort(jobInitQueue, comp);
-    }
-  }
-
   public synchronized ClusterStatus getClusterStatus() {
     synchronized (taskTrackers) {
       return new ClusterStatus(taskTrackers.size(),
@@ -2042,10 +1849,12 @@
   synchronized void setJobPriority(JobID jobId, JobPriority priority) {
     JobInProgress job = jobs.get(jobId);
     if (job != null) {
-      job.setPriority(priority);
-      
-      // Re-sort jobs to reflect this change
-      resortPriority();
+      synchronized (taskScheduler) {
+        job.setPriority(priority);
+        for (JobInProgressListener listener : jobInProgressListeners) {
+          listener.jobUpdated(job);
+        }
+      }
     } else {
       LOG.warn("Trying to change the priority of an unknown job: " + jobId);
     }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java?rev=678193&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java Sat Jul 19 10:50:52 2008
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A {@link TaskScheduler} that limits the maximum number of tasks
+ * running for a job. The limit is set by means of the
+ * <code>mapred.jobtracker.scheduler.maxRunningTasksPerJob</code>
+ * property.
+ */
+class LimitTasksPerJobTaskScheduler extends JobQueueTaskScheduler {
+  
+  private static final Log LOG = LogFactory.getLog(
+    "org.apache.hadoop.mapred.TaskLimitedJobQueueTaskScheduler");
+  
+  public static final String MAX_TASKS_PER_JOB_PROPERTY = 
+    "mapred.jobtracker.taskScheduler.maxRunningTasksPerJob";
+  
+  private long maxTasksPerJob;
+  
+  public LimitTasksPerJobTaskScheduler() {
+    super();
+  }
+  
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    maxTasksPerJob = conf.getLong(MAX_TASKS_PER_JOB_PROPERTY ,Long.MAX_VALUE);
+    if (maxTasksPerJob <= 0) {
+      String msg = MAX_TASKS_PER_JOB_PROPERTY +
+        " is set to zero or a negative value. Aborting.";
+      LOG.fatal(msg);
+      throw new RuntimeException (msg);
+    }
+  }
+
+  @Override
+  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+      throws IOException {
+
+    /* Stats about the current taskTracker */
+    final int mapTasksNumber = taskTracker.countMapTasks();
+    final int reduceTasksNumber = taskTracker.countReduceTasks();
+    final int maximumMapTasksNumber = taskTracker.getMaxMapTasks();
+    final int maximumReduceTasksNumber = taskTracker.getMaxReduceTasks();
+    /*
+     * Statistics about the whole cluster. Most are approximate because of
+     * concurrency
+     */
+    final int numTaskTrackers =
+      taskTrackerManager.getClusterStatus().getTaskTrackers();
+    final int[] maxMapAndReduceLoad = getMaxMapAndReduceLoad(
+        maximumMapTasksNumber, maximumReduceTasksNumber);
+    final int maximumMapLoad = maxMapAndReduceLoad[0];
+    final int maximumReduceLoad = maxMapAndReduceLoad[1];
+
+    
+    final int beginAtStep;
+    /*
+     * When step == 0, this loop starts as many map tasks it can wrt
+     * maxTasksPerJob
+     * When step == 1, this loop starts as many reduce tasks it can wrt
+     * maxTasksPerJob
+     * When step == 2, this loop starts as many map tasks it can
+     * When step == 3, this loop starts as many reduce tasks it can
+     *
+     * It may seem that we would improve this loop by queuing jobs we cannot
+     * start in steps 0 and 1 because of maxTasksPerJob, and using that queue
+     * in step 2 and 3.
+     * A first thing to notice is that the time with the current algorithm is
+     * logarithmic, because it is the sum of (p^k) for k from 1 to N, were
+     * N is the number of jobs and p is the probability for a job to not exceed
+     * limits The probability for the cache to be useful would be similar to
+     * p^N, that is 1/(e^N), whereas its size and the time spent to manage it
+     * would be in ln(N).
+     * So it is not a good idea.
+     */
+    if (maxTasksPerJob != Long.MAX_VALUE) {
+      beginAtStep = 0;
+    }
+    else {
+      beginAtStep = 2;
+    }
+    for (int step = beginAtStep; step <= 3; ++step) {
+      /* If we reached the maximum load for this step, go to the next */
+      if ((step == 0 || step == 2) && mapTasksNumber >= maximumMapLoad ||
+          (step == 1 || step == 3) && reduceTasksNumber >= maximumReduceLoad) {
+        continue;
+      }
+      /* For each job, start its tasks */
+      Collection<JobInProgress> jobQueue =
+        jobQueueJobInProgressListener.getJobQueue();
+      synchronized (jobQueue) {
+        for (JobInProgress job : jobQueue) {
+          Task task;
+          /* Ignore non running jobs */
+          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+            continue;
+          }
+          /* Check that we're not exceeding the global limits */
+          if ((step == 0 || step == 1)
+              && (job.runningMaps() + job.runningReduces() >= maxTasksPerJob)) {
+            continue;
+          }
+          if (step == 0 || step == 2) {
+            task = job.obtainNewMapTask(taskTracker, numTaskTrackers,
+                taskTrackerManager.getNumberOfUniqueHosts());
+          }
+          else {
+            task = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
+                taskTrackerManager.getNumberOfUniqueHosts());
+          }
+          if (task != null) {
+            return Collections.singletonList(task);
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Determine the maximum number of maps or reduces that we are willing to run
+   * on a taskTracker which accept a maximum of localMaxMapLoad maps and
+   * localMaxReduceLoad reduces
+   * @param localMaxMapLoad The local maximum number of map tasks for a host
+   * @param localMaxReduceLoad The local maximum number of reduce tasks for a
+   * host
+   * @return An array of the two maximums: map then reduce.
+   */
+  protected synchronized int[] getMaxMapAndReduceLoad(int localMaxMapLoad,
+      int localMaxReduceLoad) {
+    // Approximate because of concurrency
+    final int numTaskTrackers =
+      taskTrackerManager.getClusterStatus().getTaskTrackers();
+    /* Hold the result */
+    int maxMapLoad = 0;
+    int maxReduceLoad = 0;
+    int neededMaps = 0;
+    int neededReduces = 0;
+    Collection<JobInProgress> jobQueue =
+      jobQueueJobInProgressListener.getJobQueue();
+    synchronized (jobQueue) {
+      for (JobInProgress job : jobQueue) {
+        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+          neededMaps += job.desiredMaps() - job.finishedMaps();
+          neededReduces += job.desiredReduces() - job.finishedReduces();
+        }
+      }
+    }
+    if (numTaskTrackers > 0) {
+      maxMapLoad = Math.min(localMaxMapLoad, (int) Math
+          .ceil((double) neededMaps / numTaskTrackers));
+      maxReduceLoad = Math.min(localMaxReduceLoad, (int) Math
+          .ceil((double) neededReduces / numTaskTrackers));
+    }
+    return new int[] { maxMapLoad, maxReduceLoad };
+  }
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java?rev=678193&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java Sat Jul 19 10:50:52 2008
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used by a {@link JobTracker} to schedule {@link Task}s on
+ * {@link TaskTracker}s.
+ * <p>
+ * {@link TaskScheduler}s typically use one or more
+ * {@link JobInProgressListener}s to receive notifications about jobs.
+ * <p>
+ * It is the responsibility of the {@link TaskScheduler}
+ * to initialize tasks for a job, by calling {@link JobInProgress#initTasks()}
+ * between the job being added (when
+ * {@link JobInProgressListener#jobAdded(JobInProgress)} is called)
+ * and tasks for that job being assigned (by
+ * {@link #assignTasks(TaskTrackerStatus)}).
+ * @see EagerTaskInitializationListener
+ */
+abstract class TaskScheduler implements Configurable {
+
+  protected Configuration conf;
+  protected TaskTrackerManager taskTrackerManager;
+  
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public synchronized void setTaskTrackerManager(
+      TaskTrackerManager taskTrackerManager) {
+    this.taskTrackerManager = taskTrackerManager;
+  }
+  
+  /**
+   * Lifecycle method to allow the scheduler to start any work in separate
+   * threads.
+   * @throws IOException
+   */
+  public void start() throws IOException {
+    // do nothing
+  }
+  
+  /**
+   * Lifecycle method to allow the scheduler to stop any work it is doing.
+   * @throws IOException
+   */
+  public void terminate() throws IOException {
+    // do nothing
+  }
+
+  /**
+   * Returns the tasks we'd like the TaskTracker to execute right now.
+   * 
+   * @param taskTracker The TaskTracker for which we're looking for tasks.
+   * @return A list of tasks to run on that TaskTracker, possibly empty.
+   */
+  public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker)
+    throws IOException;
+  
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=678193&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java Sat Jul 19 10:50:52 2008
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+
+/**
+ * Manages information about the {@link TaskTracker}s running on a cluster.
+ * This interface exits primarily to test the {@link JobTracker}, and is not
+ * intended to be implemented by users.
+ */
+interface TaskTrackerManager {
+
+  /**
+   * @return A collection of the {@link TaskTrackerStatus} for the tasktrackers
+   * being managed.
+   */
+  public Collection<TaskTrackerStatus> taskTrackers();
+  
+  /**
+   * @return The number of unique hosts running tasktrackers.
+   */
+  public int getNumberOfUniqueHosts();
+  
+  /**
+   * @return a summary of the cluster's status.
+   */
+  public ClusterStatus getClusterStatus();
+
+  /**
+   * Registers a {@link JobInProgressListener} for updates from this
+   * {@link TaskTrackerManager}.
+   * @param jobInProgressListener the {@link JobInProgressListener} to add
+   */
+  public void addJobInProgressListener(JobInProgressListener listener);
+
+  /**
+   * Unregisters a {@link JobInProgressListener} from this
+   * {@link TaskTrackerManager}.
+   * @param jobInProgressListener the {@link JobInProgressListener} to remove
+   */
+  public void removeJobInProgressListener(JobInProgressListener listener);
+
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=678193&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Sat Jul 19 10:50:52 2008
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+
+public class TestJobQueueTaskScheduler extends TestCase {
+  
+  private static int jobCounter;
+  private static int taskCounter;
+  
+  static class FakeJobInProgress extends JobInProgress {
+    
+    private FakeTaskTrackerManager taskTrackerManager;
+    
+    public FakeJobInProgress(JobConf jobConf,
+        FakeTaskTrackerManager taskTrackerManager) throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf);
+      this.taskTrackerManager = taskTrackerManager;
+      this.startTime = System.currentTimeMillis();
+      this.status = new JobStatus();
+      this.status.setRunState(JobStatus.PREP);
+    }
+    
+    @Override
+    public synchronized void initTasks() throws IOException {
+      // do nothing
+    }
+
+    @Override
+    public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
+        int ignored) throws IOException {
+      TaskAttemptID attemptId = getTaskAttemptID(true);
+      Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+        @Override
+        public String toString() {
+          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+        }
+      };
+      taskTrackerManager.update(tts.getTrackerName(), task);
+      runningMapTasks++;
+      return task;
+    }
+    
+    @Override
+    public Task obtainNewReduceTask(final TaskTrackerStatus tts,
+        int clusterSize, int ignored) throws IOException {
+      TaskAttemptID attemptId = getTaskAttemptID(false);
+      Task task = new ReduceTask("", attemptId, 0, 10) {
+        @Override
+        public String toString() {
+          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+        }
+      };
+      taskTrackerManager.update(tts.getTrackerName(), task);
+      runningReduceTasks++;
+      return task;
+    }
+    
+    private TaskAttemptID getTaskAttemptID(boolean isMap) {
+      JobID jobId = getJobID();
+      return new TaskAttemptID(jobId.getJtIdentifier(),
+          jobId.getId(), isMap, ++taskCounter, 0);
+    }
+  }
+  
+  static class FakeTaskTrackerManager implements TaskTrackerManager {
+    
+    int maps = 0;
+    int reduces = 0;
+    int maxMapTasksPerTracker = 2;
+    int maxReduceTasksPerTracker = 2;
+    List<JobInProgressListener> listeners =
+      new ArrayList<JobInProgressListener>();
+    
+    private Map<String, TaskTrackerStatus> trackers =
+      new HashMap<String, TaskTrackerStatus>();
+
+    public FakeTaskTrackerManager() {
+      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
+          new ArrayList<TaskStatus>(), 0,
+          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+      trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
+          new ArrayList<TaskStatus>(), 0,
+          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+    }
+    
+    @Override
+    public ClusterStatus getClusterStatus() {
+      int numTrackers = trackers.size();
+      return new ClusterStatus(numTrackers, maps, reduces,
+          numTrackers * maxMapTasksPerTracker,
+          numTrackers * maxReduceTasksPerTracker,
+          JobTracker.State.RUNNING);
+    }
+
+    @Override
+    public int getNumberOfUniqueHosts() {
+      return 0;
+    }
+
+    @Override
+    public Collection<TaskTrackerStatus> taskTrackers() {
+      return trackers.values();
+    }
+
+
+    @Override
+    public void addJobInProgressListener(JobInProgressListener listener) {
+      listeners.add(listener);
+    }
+
+    @Override
+    public void removeJobInProgressListener(JobInProgressListener listener) {
+      listeners.remove(listener);
+    }
+    
+    // Test methods
+    
+    public void submitJob(JobInProgress job) {
+      for (JobInProgressListener listener : listeners) {
+        listener.jobAdded(job);
+      }
+    }
+    
+    public TaskTrackerStatus getTaskTracker(String trackerID) {
+      return trackers.get(trackerID);
+    }
+    
+    public void update(String taskTrackerName, final Task t) {
+      if (t.isMapTask()) {
+        maps++;
+      } else {
+        reduces++;
+      }
+      TaskStatus status = new TaskStatus() {
+        @Override
+        public boolean getIsMap() {
+          return t.isMapTask();
+        }
+      };
+      status.setRunState(TaskStatus.State.RUNNING);
+      trackers.get(taskTrackerName).getTaskReports().add(status);
+    }
+    
+  }
+  
+  protected JobConf jobConf;
+  protected TaskScheduler scheduler;
+  private FakeTaskTrackerManager taskTrackerManager;
+
+  @Override
+  protected void setUp() throws Exception {
+    jobCounter = 0;
+    taskCounter = 0;
+    jobConf = new JobConf();
+    jobConf.setNumMapTasks(10);
+    jobConf.setNumReduceTasks(10);
+    taskTrackerManager = new FakeTaskTrackerManager();
+    scheduler = createTaskScheduler();
+    scheduler.setConf(jobConf);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    scheduler.start();
+  }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    if (scheduler != null) {
+      scheduler.terminate();
+    }
+  }
+  
+  protected TaskScheduler createTaskScheduler() {
+    return new JobQueueTaskScheduler();
+  }
+  
+  protected void submitJobs(int number, int state)
+    throws IOException {
+    for (int i = 0; i < number; i++) {
+      JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
+      job.getStatus().setRunState(state);
+      taskTrackerManager.submitJob(job);
+    }
+  }
+
+  public void testTaskNotAssignedWhenNoJobsArePresent() throws IOException {
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+
+  public void testNonRunningJobsAreIgnored() throws IOException {
+    submitJobs(1, JobStatus.PREP);
+    submitJobs(1, JobStatus.SUCCEEDED);
+    submitJobs(1, JobStatus.FAILED);
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+  
+  public void testDefaultTaskAssignment() throws IOException {
+    submitJobs(2, JobStatus.RUNNING);
+    
+    // All slots are filled with job 1
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+  }
+
+  protected TaskTrackerStatus tracker(String taskTrackerName) {
+    return taskTrackerManager.getTaskTracker(taskTrackerName);
+  }
+  
+  protected void checkAssignment(String taskTrackerName,
+      String expectedTaskString) throws IOException {
+    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    assertNotNull(expectedTaskString, tasks);
+    assertEquals(expectedTaskString, 1, tasks.size());
+    assertEquals(expectedTaskString, tasks.get(0).toString());
+  }
+  
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLimitTasksPerJobTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLimitTasksPerJobTaskScheduler.java?rev=678193&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLimitTasksPerJobTaskScheduler.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLimitTasksPerJobTaskScheduler.java Sat Jul 19 10:50:52 2008
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+public class TestLimitTasksPerJobTaskScheduler
+  extends TestJobQueueTaskScheduler{
+  
+  protected TaskScheduler createTaskScheduler() {
+    return new LimitTasksPerJobTaskScheduler();
+  }
+
+  public void testMaxRunningTasksPerJob() throws IOException {
+    jobConf.setLong(LimitTasksPerJobTaskScheduler.MAX_TASKS_PER_JOB_PROPERTY,
+        4L);
+    scheduler.setConf(jobConf);
+    submitJobs(2, JobStatus.RUNNING);
+    
+    // First 4 slots are filled with job 1, second 4 with job 2
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
+  }
+  
+  public void testMaxRunningTasksPerJobWithInterleavedTrackers()
+      throws IOException {
+    jobConf.setLong(LimitTasksPerJobTaskScheduler.MAX_TASKS_PER_JOB_PROPERTY,
+        4L);
+    scheduler.setConf(jobConf);
+    submitJobs(2, JobStatus.RUNNING);
+    
+    // First 4 slots are filled with job 1, second 4 with job 2
+    // even when tracker requests are interleaved
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0002_r_000006_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
+  }
+  
+}



Mime
View raw message