hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r722760 [1/2] - in /hadoop/core/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/...
Date Wed, 03 Dec 2008 04:32:19 GMT
Author: yhemanth
Date: Tue Dec  2 20:32:18 2008
New Revision: 722760

URL: http://svn.apache.org/viewvc?rev=722760&view=rev
Log:
HADOOP-4035. Support memory based scheduling in capacity scheduler. Contributed by Vinod Kumar Vavilapalli.

Added:
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/core/trunk/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java
    hadoop/core/trunk/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/DummyMemoryCalculatorPlugin.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java
Removed:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestHighRAMJobs.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/capacity-scheduler.xml.template
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Dec  2 20:32:18 2008
@@ -44,6 +44,9 @@
     HADOOP-4422. S3 file systems should not create bucket.
     (David Phillips via tomwhite)
 
+    HADOOP-4035. Support memory based scheduling in capacity scheduler.
+    (Vinod Kumar Vavilapalli via yhemanth)
+
   NEW FEATURES
 
     HADOOP-4575. Add a proxy service for relaying HsftpFileSystem requests.

Modified: hadoop/core/trunk/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/capacity-scheduler.xml.template?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/conf/capacity-scheduler.xml.template (original)
+++ hadoop/core/trunk/conf/capacity-scheduler.xml.template Tue Dec  2 20:32:18 2008
@@ -73,6 +73,37 @@
       account in scheduling decisions by default in a job queue.
     </description>
   </property>
+
+  <property>
+    <name>mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem</name>
+    <value>-1</value>
+    <description>If mapred.task.maxpmem is set to -1, this configuration will
+      be used to calculate job's physical memory requirements as a percentage of
+      the job's virtual memory requirements set via mapred.task.maxvmem. This
+      property thus provides default value of physical memory for job's that
+      don't explicitly specify physical memory requirements.
+
+      If not explicitly set to a valid value, scheduler will not consider
+      physical memory for scheduling even if virtual memory based scheduling is
+      enabled(by setting valid values for both mapred.task.default.maxvmem and
+      mapred.task.limit.maxvmem).
+    </description>
+  </property>
+
+  <property>
+    <name>mapred.capacity-scheduler.task.limit.maxpmem</name>
+    <value>-1</value>
+    <description>Configuration that provides an upper limit on the maximum
+      physical memory that can be specified by a job. The job configuration
+      mapred.task.maxpmem should be less than this value. If not, the job will
+      be rejected by the scheduler.
+      
+      If it is set to -1, scheduler will not consider physical memory for
+      scheduling even if virtual memory based scheduling is enabled(by setting
+      valid values for both mapred.task.default.maxvmem and
+      mapred.task.limit.maxvmem).
+    </description>
+  </property>
   
   <property>
     <name>mapred.capacity-scheduler.default-minimum-user-limit-percent</name>

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Tue Dec  2 20:32:18 2008
@@ -740,6 +740,143 @@
 </property>
 
 <property>
+  <name>mapred.tasktracker.vmem.reserved</name>
+  <value>-1</value>
+  <description>Configuration property to specify the amount of virtual memory
+    that has to be reserved by the TaskTracker for system usage (OS, TT etc).
+    The reserved virtual memory should be a part of the total virtual memory
+    available on the TaskTracker.
+    
+    The reserved virtual memory and the total virtual memory values are
+    reported by the TaskTracker as part of heart-beat so that they can
+    considered by a scheduler. Please refer to the documentation of the
+    configured scheduler to see how this property is used.
+    
+    These two values are also used by a TaskTracker for tracking tasks' memory
+    usage. Memory management functionality on a TaskTracker is disabled if this
+    property is set to -1, if it more than the total virtual memory on the 
+    tasktracker, or if either of the values is negative.
+  </description>
+</property>
+
+<property>
+  <name>mapred.tasktracker.pmem.reserved</name>
+  <value>-1</value>
+  <description>Configuration property to specify the amount of physical memory
+    that has to be reserved by the TaskTracker for system usage (OS, TT etc).
+    The reserved physical memory should be a part of the total physical memory
+    available on the TaskTracker.
+
+    The reserved physical memory and the total physical memory values are
+    reported by the TaskTracker as part of heart-beat so that they can
+    considered by a scheduler. Please refer to the documentation of the
+    configured scheduler to see how this property is used.
+  </description>
+</property>
+
+<property>
+  <name>mapred.task.default.maxvmem</name>
+  <value>-1</value>
+  <description>
+    Cluster-wide configuration in bytes to be set by the administrators that
+    provides default amount of maximum virtual memory for job's tasks. This has
+    to be set on both the JobTracker node for the sake of scheduling decisions
+    and on the TaskTracker nodes for the sake of memory management.
+
+    If a job doesn't specify its virtual memory requirement by setting
+    mapred.task.maxvmem to -1, tasks are assured a memory limit set
+    to this property. This property is set to -1 by default.
+
+    This value should in general be less than the cluster-wide
+    configuration mapred.task.limit.maxvmem. If not or if it is not set,
+    TaskTracker's memory management will be disabled and a scheduler's memory
+    based scheduling decisions may be affected. Please refer to the
+    documentation of the configured scheduler to see how this property is used.
+  </description>
+</property>
+
+<property>
+  <name>mapred.task.limit.maxvmem</name>
+  <value>-1</value>
+  <description>
+    Cluster-wide configuration in bytes to be set by the site administrators
+    that provides an upper limit on the maximum virtual memory that can be
+    specified by a job via mapred.task.maxvmem. This has to be set on both the
+    JobTracker node for the sake of scheduling decisions and on the TaskTracker
+    nodes for the sake of memory management.
+    
+    The job configuration mapred.task.maxvmem should not be more than this
+    value, otherwise depending on the scheduler being configured, the job may
+    be rejected or the job configuration may just be ignored. Please refer to
+    the documentation of the configured scheduler to see how this property is
+    used.
+
+    If it is not set a TaskTracker, TaskTracker's memory management will be
+    disabled.
+  </description>
+</property>
+
+<property>
+  <name>mapred.task.maxvmem</name>
+  <value>-1</value>
+  <description>
+    The maximum amount of virtual memory any task of a job will use, in bytes.
+
+    This value will be used by TaskTrackers for monitoring the memory usage of
+    tasks of this jobs. If a TaskTracker's memory management functionality is
+    enabled, each task of this job will be allowed to use a maximum virtual
+    memory specified by this property. If the task's memory usage goes over 
+    this value, the task will be failed by the TT. If not set, the
+    cluster-wide configuration mapred.task.default.maxvmem is used as the
+    default value for memory requirements. If this property cascaded with
+    mapred.task.default.maxvmem becomes equal to -1, the job's tasks will
+    not be assured any particular amount of virtual memory and may be killed by
+    a TT that intends to control the total memory usage of the tasks via memory
+    management functionality. If the memory management functionality is
+    disabled on a TT, this value is ignored.
+
+    This value should not be more than the cluster-wide configuration
+    mapred.task.limit.maxvmem.
+
+    This value may be used by schedulers that support scheduling based on job's
+    memory requirements. Please refer to the documentation of the scheduler
+    being configured to see if it does memory based scheduling and if it does,
+    how this property is used by that scheduler.
+  </description>
+</property>
+
+<property>
+  <name>mapred.task.maxpmem</name>name>
+  <value>-1</value>
+  <description>
+   The maximum amount of physical memory any task of a job will use in bytes.
+
+   This value may be used by schedulers that support scheduling based on job's
+   memory requirements. In general, a task of this job will be scheduled on a
+   TaskTracker, only if the amount of physical memory still unoccupied on the
+   TaskTracker is greater than or equal to this value. Different schedulers can
+   take different decisions, some might just ignore this value. Please refer to
+   the documentation of the scheduler being configured to see if it does
+   memory based scheduling and if it does, how this variable is used by that
+   scheduler.
+  </description>
+</property>
+
+<property>
+  <name>mapred.tasktracker.memory_calculator_plugin</name>
+  <value></value>
+  <description>
+   Name of the class whose instance will be used to query memory information
+   on the tasktracker.
+   
+   The class must be an instance of 
+   org.apache.hadoop.util.MemoryCalculatorPlugin. If the value is null, the
+   tasktracker attempts to use a class appropriate to the platform. 
+   Currently, the only platform supported is Linux.
+  </description>
+</property>
+
+<property>
   <name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name>
   <value>5000</value>
   <description>The interval, in milliseconds, for which the tasktracker waits

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Tue Dec  2 20:32:18 2008
@@ -43,6 +43,34 @@
   private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = 
     "mapred.capacity-scheduler.queue.";
 
+  /**
+   * If {@link JobConf#MAPRED_TASK_MAXPMEM_PROPERTY} is set to
+   * {@link JobConf#DISABLED_MEMORY_LIMIT}, this configuration will be used to
+   * calculate job's physical memory requirements as a percentage of the job's
+   * virtual memory requirements set via
+   * {@link JobConf#setMaxVirtualMemoryForTask()}. This property thus provides
+   * default value of physical memory for job's that don't explicitly specify
+   * physical memory requirements.
+   * 
+   * It defaults to {@link JobConf#DISABLED_MEMORY_LIMIT} and if not explicitly
+   * set to a valid value, scheduler will not consider physical memory for
+   * scheduling even if virtual memory based scheduling is enabled.
+   */
+  static String DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY =
+      "mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem";
+
+  /**
+   * Configuration that provides an upper limit on the maximum physical memory
+   * that can be specified by a job. The job configuration
+   * {@link JobConf#MAPRED_TASK_MAXPMEM_PROPERTY} should,
+   * by definition, be less than this value. If not, the job will be rejected
+   * by the scheduler. If it is set to {@link JobConf#DISABLED_MEMORY_LIMIT},
+   * scheduler will not consider physical memory for scheduling even if virtual
+   * memory based scheduling is enabled.
+   */
+  static final String UPPER_LIMIT_ON_TASK_PMEM_PROPERTY =
+      "mapred.capacity-scheduler.task.limit.maxpmem";
+
   private Configuration rmConf;
 
   private int defaultMaxJobsPerUsersToInitialize;
@@ -348,4 +376,44 @@
     rmConf.setInt(
         "mapred.capacity-scheduler.init-worker-threads", poolSize);
   }
+
+  /**
+   * Get the upper limit on the maximum physical memory that can be specified by
+   * a job.
+   * 
+   * @return upper limit for max pmem for tasks.
+   */
+  public long getLimitMaxPmemForTasks() {
+    return rmConf.getLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY,
+        JobConf.DISABLED_MEMORY_LIMIT);
+  }
+
+  /**
+   * Get the upper limit on the maximum physical memory that can be specified by
+   * a job.
+   * 
+   * @param value
+   */
+  public void setLimitMaxPmemForTasks(long value) {
+    rmConf.setLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY, value);
+  }
+
+  /**
+   * Get cluster-wide default percentage of pmem in vmem.
+   * 
+   * @return cluster-wide default percentage of pmem in vmem.
+   */
+  public float getDefaultPercentOfPmemInVmem() {
+    return rmConf.getFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY,
+        JobConf.DISABLED_MEMORY_LIMIT);
+  }
+
+  /**
+   * Set cluster-wide default percentage of pmem in vmem.
+   * 
+   * @param value
+   */
+  public void setDefaultPercentOfPmemInVmem(float value) {
+    rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
+  }
 }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue Dec  2 20:32:18 2008
@@ -229,8 +229,40 @@
       return sb.toString();
     }
   }
-  
-  
+
+  private static enum TaskLookUpStatus {
+    TASK_FOUND,
+    NO_TASK_IN_JOB,
+    NO_TASK_IN_QUEUE,
+    NO_TASK_MATCHING_MEMORY_REQUIREMENTS,
+  }
+
+  private static class TaskLookupResult {
+
+    private Task task;
+    private String lookupStatusInfo;
+
+    private TaskLookUpStatus lookUpStatus;
+
+    TaskLookupResult(Task t, TaskLookUpStatus lUStatus, String statusInfo) {
+      this.task = t;
+      this.lookUpStatus = lUStatus;
+      this.lookupStatusInfo = statusInfo;
+    }
+
+    Task getTask() {
+      return task;
+    }
+
+    TaskLookUpStatus getLookUpStatus() {
+      return lookUpStatus;
+    }
+
+    String getLookupStatusInfo() {
+      return lookupStatusInfo;
+    }
+  }
+
   /** 
    * This class handles the scheduling algorithms. 
    * The algos are the same for both Map and Reduce tasks. 
@@ -247,7 +279,11 @@
     /** our enclosing TaskScheduler object */
     protected CapacityTaskScheduler scheduler;
     // for debugging
-    protected String type = null;
+    protected static enum TYPE {
+      MAP, REDUCE
+    }
+
+    protected TYPE type = null;
 
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
         JobInProgress job) throws IOException; 
@@ -538,7 +574,6 @@
       }
     }
 
-    
     void jobAdded(JobInProgress job) {
       // update qsi 
       QueueSchedulingInfo qsi = 
@@ -558,6 +593,7 @@
       LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
                 + job.getProfile().getUser() + ", user now has " + i + " jobs");
     }
+
     void jobRemoved(JobInProgress job) {
       // update qsi 
       QueueSchedulingInfo qsi = 
@@ -627,61 +663,133 @@
         return false;
       }
     }
-    
-    private Task getTaskFromQueue(TaskTrackerStatus taskTracker, 
-        QueueSchedulingInfo qsi) throws IOException {
-      Task t = null;
+
+    private TaskLookupResult getTaskFromQueue(TaskTrackerStatus taskTracker,
+        QueueSchedulingInfo qsi)
+        throws IOException {
+
       // keep track of users over limit
       Set<String> usersOverLimit = new HashSet<String>();
-      // look at running jobs first
-      for (JobInProgress j:
-        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
-        // some jobs may be in the running queue but may have completed 
+
+      // Look at running jobs first, skipping jobs of those users who are over
+      // their limits
+      TaskLookupResult result =
+          getTaskFromRunningJobQueue(taskTracker, qsi, usersOverLimit, true);
+      TaskLookUpStatus lookUpStatus = result.getLookUpStatus();
+      if (lookUpStatus == TaskLookUpStatus.TASK_FOUND
+          || lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
+        // No need for looking elsewhere
+        return result;
+      }
+
+      // if we're here, we haven't found anything. This could be because
+      // there is nothing to run, or that the user limit for some user is
+      // too strict, i.e., there's at least one user who doesn't have
+      // enough tasks to satisfy his limit. If it's the later case, look at
+      // jobs without considering user limits, and get task from first
+      // eligible job
+      if (usersOverLimit.size() > 0) {
+        // look at running jobs, considering users over limit
+        result =
+            getTaskFromRunningJobQueue(taskTracker, qsi, usersOverLimit, false);
+        lookUpStatus = result.getLookUpStatus();
+        if (lookUpStatus == TaskLookUpStatus.TASK_FOUND
+            || lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
+          // No need for looking elsewhere
+          return result;
+        }
+      }
+
+      // found nothing for this queue, look at the next one.
+      String msg = "Found no task from the queue" + qsi.queueName;
+      LOG.info(msg);
+      return new TaskLookupResult(null, TaskLookUpStatus.NO_TASK_IN_QUEUE,
+          msg);
+    }
+
+    // get a task from the running queue
+    private TaskLookupResult getTaskFromRunningJobQueue(
+        TaskTrackerStatus taskTracker, QueueSchedulingInfo qsi,
+        Set<String> usersOverLimit, boolean skipUsersOverLimit)
+        throws IOException {
+
+      for (JobInProgress j : scheduler.jobQueuesManager
+          .getRunningJobQueue(qsi.queueName)) {
+        // some jobs may be in the running queue but may have completed
         // and not yet have been removed from the running queue
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
-        // is this job's user over limit?
-        if (isUserOverLimit(j, qsi)) {
-          // user over limit. 
-          usersOverLimit.add(j.getProfile().getUser());
-          continue;
+
+        if (skipUsersOverLimit) {
+          // consider jobs of only those users who are under limits
+          if (isUserOverLimit(j, qsi)) {
+            usersOverLimit.add(j.getProfile().getUser());
+            continue;
+          }
+        } else {
+          // consider jobs of only those users who are over limit
+          if (!usersOverLimit.contains(j.getProfile().getUser())) {
+            continue;
+          }
         }
-        // We found a suitable job. Get task from it.
-        t = obtainNewTask(taskTracker, j);
-        if (t != null) {
-          LOG.debug("Got task from job " + 
-                    j.getJobID() + " in queue " + qsi.queueName);
-          return t;
+
+        // We found a suitable job. Try getting a task from it.
+        TaskLookupResult tlr = getTaskFromJob(j, taskTracker, qsi);
+        TaskLookUpStatus lookUpStatus = tlr.getLookUpStatus();
+        if (lookUpStatus == TaskLookUpStatus.NO_TASK_IN_JOB) {
+          // Go to the next job in the same queue.
+          continue;
+        } else if (lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS
+            || lookUpStatus == TaskLookUpStatus.TASK_FOUND) {
+          // No need for considering the next jobs in this queue.
+          return tlr;
         }
       }
-      
 
-      
-      // if we're here, we haven't found anything. This could be because 
-      // there is nothing to run, or that the user limit for some user is 
-      // too strict, i.e., there's at least one user who doesn't have
-      // enough tasks to satisfy his limit. If it's the later case, look at 
-      // jobs without considering user limits, and get task from first 
-      // eligible job
-      if (usersOverLimit.size() > 0) {
-        for (JobInProgress j:
-          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
-          if ((j.getStatus().getRunState() == JobStatus.RUNNING) && 
-              (usersOverLimit.contains(j.getProfile().getUser()))) {
-            t = obtainNewTask(taskTracker, j);
-            if (t != null) {
-              LOG.debug("Getting task from job " + 
-                        j.getJobID() + " in queue " + qsi.queueName);
-              return t;
-            }
+      String msg =
+          qsi.queueName + " queue's running jobs queue don't have "
+              + "any more tasks to run.";
+      LOG.info(msg);
+      return new TaskLookupResult(null,
+          TaskLookUpStatus.NO_TASK_IN_QUEUE, msg);
+    }
+
+    private TaskLookupResult getTaskFromJob(JobInProgress j,
+        TaskTrackerStatus taskTracker, QueueSchedulingInfo qsi)
+        throws IOException {
+      String msg;
+
+      if (getPendingTasks(j) != 0) {
+        // Not accurate TODO:
+        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+          // We found a suitable job. Get task from it.
+          Task t = obtainNewTask(taskTracker, j);
+          if (t != null) {
+            msg =
+                "Got task from job " + j.getJobID() + " in queue "
+                    + qsi.queueName;
+            LOG.debug(msg);
+            return new TaskLookupResult(t, TaskLookUpStatus.TASK_FOUND, msg);
           }
+        } else {
+          // block the cluster, till this job's tasks can be scheduled.
+          msg =
+              j.getJobID() + "'s tasks don't fit on the TaskTracker "
+                  + taskTracker.trackerName
+                  + ". Returning no task to the taskTracker";
+          LOG.info(msg);
+          return new TaskLookupResult(null,
+              TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS, msg);
         }
       }
-      
-      return null;
+
+      msg = j.getJobID() + " doesn't have any more tasks to run.";
+      LOG.debug(msg);
+      return new TaskLookupResult(null,
+          TaskLookUpStatus.NO_TASK_IN_JOB, msg);
     }
-    
+
     private List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException {
       Task t = null;
 
@@ -692,7 +800,7 @@
        * becomes expensive, do it once every few hearbeats only.
        */ 
       updateQSIObjects();
-      LOG.debug("After updating QSI objects:");
+      LOG.debug("After updating QSI objects in " + this.type + " scheduler :");
       printQSIs();
       /*
        * sort list of qeues first, as we want queues that need the most to
@@ -700,7 +808,7 @@
        * We're only sorting a collection of queues - there shouldn't be many.
        */
       updateCollectionOfQSIs();
-      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
         if (qsi.guaranteedCapacity <= 0.0f) {
           // No capacity is guaranteed yet for this queue.
           // Queues are sorted so that ones without capacities
@@ -708,13 +816,32 @@
           // from here without considering any further queues.
           return null;
         }
-        t = getTaskFromQueue(taskTracker, qsi);
-        if (t!= null) {
+
+        TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
+        TaskLookUpStatus lookUpStatus = tlr.getLookUpStatus();
+
+        if (lookUpStatus == TaskLookUpStatus.NO_TASK_IN_QUEUE) {
+          continue; // Look in other queues.
+        }
+
+        if (lookUpStatus == TaskLookUpStatus.TASK_FOUND) {
+          t = tlr.getTask();
           // we have a task. Update reclaimed resource info
           updateReclaimedResources(qsi);
           return Collections.singletonList(t);
         }
-      }        
+        
+        if (lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
+          // blocking the cluster.
+          String msg = tlr.getLookupStatusInfo();
+          if (msg != null) {
+            LOG.warn(msg);
+            LOG.warn("Returning nothing to the Tasktracker "
+                + taskTracker.trackerName);
+            return null;
+          }
+        }
+      }
 
       // nothing to give
       return null;
@@ -745,7 +872,7 @@
   private static class MapSchedulingMgr extends TaskSchedulingMgr {
     MapSchedulingMgr(CapacityTaskScheduler dad) {
       super(dad);
-      type = new String("map");
+      type = TaskSchedulingMgr.TYPE.MAP;
     }
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
     throws IOException {
@@ -822,7 +949,7 @@
   private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
     ReduceSchedulingMgr(CapacityTaskScheduler dad) {
       super(dad);
-      type = new String("reduce");
+      type = TaskSchedulingMgr.TYPE.REDUCE;
     }
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
     throws IOException {
@@ -870,7 +997,9 @@
   /** the scheduling mgrs for Map and Reduce tasks */ 
   protected TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this);
   protected TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this);
-  
+
+  MemoryMatcher memoryMatcher = new MemoryMatcher(this);
+
   /** name of the default queue. */ 
   static final String DEFAULT_QUEUE_NAME = "default";
   
@@ -880,7 +1009,7 @@
    * heartbeats left. */
   private static final int HEARTBEATS_LEFT_BEFORE_KILLING = 3;
 
-  private static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
+  static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
   protected JobQueuesManager jobQueuesManager;
   protected CapacitySchedulerConf rmConf;
   /** whether scheduler has started or not */
@@ -924,6 +1053,10 @@
   private Clock clock;
   private JobInitializationPoller initializationPoller;
 
+  long limitMaxVmemForTasks;
+  long limitMaxPmemForTasks;
+  long defaultMaxVmPerTask;
+  float defaultPercentOfPmemInVmem;
 
   public CapacityTaskScheduler() {
     this(new Clock());
@@ -939,7 +1072,40 @@
   public void setResourceManagerConf(CapacitySchedulerConf conf) {
     this.rmConf = conf;
   }
-  
+
+  /**
+   * Normalize the negative values in configuration
+   * 
+   * @param val
+   * @return normalized value
+   */
+  private long normalizeMemoryConfigValue(long val) {
+    if (val < 0) {
+      val = JobConf.DISABLED_MEMORY_LIMIT;
+    }
+    return val;
+  }
+
+  private void initializeMemoryRelatedConf() {
+    limitMaxVmemForTasks =
+        normalizeMemoryConfigValue(conf.getLong(
+            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    limitMaxPmemForTasks =
+        normalizeMemoryConfigValue(rmConf.getLimitMaxPmemForTasks());
+
+    defaultMaxVmPerTask =
+        normalizeMemoryConfigValue(conf.getLong(
+            JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    defaultPercentOfPmemInVmem = rmConf.getDefaultPercentOfPmemInVmem();
+    if (defaultPercentOfPmemInVmem < 0) {
+      defaultPercentOfPmemInVmem = JobConf.DISABLED_MEMORY_LIMIT;
+    }
+  }
+
   @Override
   public synchronized void start() throws IOException {
     if (started) return;
@@ -947,10 +1113,14 @@
     RECLAIM_CAPACITY_INTERVAL = 
       conf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5);
     RECLAIM_CAPACITY_INTERVAL *= 1000;
+
     // initialize our queues from the config settings
     if (null == rmConf) {
       rmConf = new CapacitySchedulerConf();
     }
+
+    initializeMemoryRelatedConf();
+
     // read queue info from config file
     QueueManager queueManager = taskTrackerManager.getQueueManager();
     Set<String> queues = queueManager.getQueues();
@@ -1118,13 +1288,52 @@
     }
     return tasks;
   }
-  
+
+  /**
+   * Kill the job if it has invalid requirements and return why it is killed
+   * 
+   * @param job
+   * @return string mentioning why the job is killed. Null if the job has valid
+   *         requirements.
+   */
+  private String killJobIfInvalidRequirements(JobInProgress job) {
+    if (!memoryMatcher.isSchedulingBasedOnVmemEnabled()) {
+      return null;
+    }
+    if ((job.getMaxVirtualMemoryForTask() > limitMaxVmemForTasks)
+        || (memoryMatcher.isSchedulingBasedOnPmemEnabled() && (job
+            .getMaxPhysicalMemoryForTask() > limitMaxPmemForTasks))) {
+      String msg =
+          job.getJobID() + " (" + job.getMaxVirtualMemoryForTask() + "vmem, "
+              + job.getMaxPhysicalMemoryForTask()
+              + "pmem) exceeds the cluster's max-memory-limits ("
+              + limitMaxVmemForTasks + "vmem, " + limitMaxPmemForTasks
+              + "pmem). Cannot run in this cluster, so killing it.";
+      LOG.warn(msg);
+      try {
+        taskTrackerManager.killJob(job.getJobID());
+        return msg;
+      } catch (IOException ioe) {
+        LOG.warn("Failed to kill the job " + job.getJobID() + ". Reason : "
+            + StringUtils.stringifyException(ioe));
+      }
+    }
+    return null;
+  }
+
   // called when a job is added
-  synchronized void jobAdded(JobInProgress job) {
+  synchronized void jobAdded(JobInProgress job) throws IOException {
     // let our map and reduce schedulers know this, so they can update 
     // user-specific info
     mapScheduler.jobAdded(job);
     reduceScheduler.jobAdded(job);
+
+    // Kill the job if it cannot run in the cluster because of invalid
+    // resource requirements.
+    String statusMsg = killJobIfInvalidRequirements(job);
+    if (statusMsg != null) {
+      throw new IOException(statusMsg);
+    }
   }
 
   // called when a job completes

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Tue Dec  2 20:32:18 2008
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -158,7 +159,7 @@
   }
   
   @Override
-  public void jobAdded(JobInProgress job) {
+  public void jobAdded(JobInProgress job) throws IOException {
     LOG.info("Job submitted to queue " + job.getProfile().getQueueName());
     // add job to the right queue
     QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());

Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=722760&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Tue Dec  2 20:32:18 2008
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class MemoryMatcher {
+
+  private static final Log LOG = LogFactory.getLog(MemoryMatcher.class);
+  private CapacityTaskScheduler scheduler;
+
+  public MemoryMatcher(CapacityTaskScheduler capacityTaskScheduler) {
+    this.scheduler = capacityTaskScheduler;
+  }
+
+  boolean isSchedulingBasedOnVmemEnabled() {
+    LOG.debug("defaultMaxVmPerTask : " + scheduler.defaultMaxVmPerTask
+        + " limitMaxVmemForTasks : " + scheduler.limitMaxVmemForTasks);
+    if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.limitMaxVmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
+      return false;
+    }
+    return true;
+  }
+
+  boolean isSchedulingBasedOnPmemEnabled() {
+    LOG.debug("defaultPercentOfPmemInVmem : "
+        + scheduler.defaultPercentOfPmemInVmem + " limitMaxPmemForTasks : "
+        + scheduler.limitMaxPmemForTasks);
+    if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.limitMaxPmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Obtain the virtual memory allocated for a job's tasks.
+   * 
+   * If the job has a configured value for the max-virtual memory, that will be
+   * returned. Else, the cluster-wide default max-virtual memory for tasks is
+   * returned.
+   * 
+   * This method can only be called after
+   * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
+   * 
+   * @param jConf JobConf of the job
+   * @return the virtual memory allocated for the job's tasks.
+   */
+  private long getVirtualMemoryForTask(JobConf jConf) {
+    long vMemForTask = jConf.getMaxVirtualMemoryForTask();
+    if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      vMemForTask =
+          new JobConf().getLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+              scheduler.defaultMaxVmPerTask);
+    }
+    return vMemForTask;
+  }
+
+  /**
+   * Obtain the physical memory allocated for a job's tasks.
+   * 
+   * If the job has a configured value for the max physical memory, that
+   * will be returned. Else, the cluster-wide default physical memory for
+   * tasks is returned.
+   * 
+   * This method can only be called after
+   * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
+   * 
+   * @param jConf JobConf of the job
+   * @return the physical memory allocated for the job's tasks
+   */
+  private long getPhysicalMemoryForTask(JobConf jConf) {
+    long pMemForTask = jConf.getMaxPhysicalMemoryForTask();
+    if (pMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      pMemForTask =
+          Math.round(getVirtualMemoryForTask(jConf)
+              * scheduler.defaultPercentOfPmemInVmem);
+    }
+    return pMemForTask;
+  }
+
+  static class Memory {
+    long vmem;
+    long pmem;
+
+    Memory(long vm, long pm) {
+      this.vmem = vm;
+      this.pmem = pm;
+    }
+  }
+
+  /**
+   * Find the memory that is already used by all the running tasks
+   * residing on the given TaskTracker.
+   * 
+   * @param taskTracker
+   * @return amount of memory that is used by the residing tasks
+   */
+  private synchronized Memory getMemReservedForTasks(
+      TaskTrackerStatus taskTracker) {
+    boolean disabledVmem = false;
+    boolean disabledPmem = false;
+
+    if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      disabledVmem = true;
+    }
+
+    if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT) {
+      disabledPmem = true;
+    }
+
+    if (disabledVmem && disabledPmem) {
+      return new Memory(JobConf.DISABLED_MEMORY_LIMIT,
+          JobConf.DISABLED_MEMORY_LIMIT);
+    }
+
+    long vmem = 0;
+    long pmem = 0;
+
+    for (TaskStatus task : taskTracker.getTaskReports()) {
+      // the following task states are one in which the slot is
+      // still occupied and hence memory of the task should be
+      // accounted in used memory.
+      if ((task.getRunState() == TaskStatus.State.RUNNING)
+          || (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
+        JobConf jConf =
+            scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID())
+                .getJobConf();
+        if (!disabledVmem) {
+          vmem += getVirtualMemoryForTask(jConf);
+        }
+        if (!disabledPmem) {
+          pmem += getPhysicalMemoryForTask(jConf);
+        }
+      }
+    }
+
+    return new Memory(vmem, pmem);
+  }
+
+  /**
+   * Check if a TT has enough pmem and vmem to run this job.
+   * @param job
+   * @param taskTracker
+   * @return true if this TT has enough memory for this job. False otherwise.
+   */
+  boolean matchesMemoryRequirements(JobInProgress job,
+      TaskTrackerStatus taskTracker) {
+
+    // ////////////// vmem based scheduling
+    if (!isSchedulingBasedOnVmemEnabled()) {
+      LOG.debug("One of the configuration parameters defaultMaxVmPerTask "
+          + "and limitMaxVmemPerTasks is not configured. Scheduling based "
+          + "on job's memory requirements is disabled, ignoring any value "
+          + "set by job.");
+      return true;
+    }
+
+    TaskTrackerStatus.ResourceStatus resourceStatus =
+        taskTracker.getResourceStatus();
+    long totalVMemOnTT = resourceStatus.getTotalVirtualMemory();
+    long reservedVMemOnTT = resourceStatus.getReservedTotalMemory();
+
+    if (totalVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT
+        || reservedVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
+      return true;
+    }
+
+    if (reservedVMemOnTT > totalVMemOnTT) {
+      return true;
+    }
+
+    long jobVMemForTask = job.getMaxVirtualMemoryForTask();
+    if (jobVMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      jobVMemForTask = scheduler.defaultMaxVmPerTask;
+    }
+
+    Memory memReservedForTasks = getMemReservedForTasks(taskTracker);
+    long vmemUsedOnTT = memReservedForTasks.vmem;
+    long pmemUsedOnTT = memReservedForTasks.pmem;
+
+    long freeVmemUsedOnTT = totalVMemOnTT - vmemUsedOnTT - reservedVMemOnTT;
+
+    if (jobVMemForTask > freeVmemUsedOnTT) {
+      return false;
+    }
+
+    // ////////////// pmem based scheduling
+
+    long totalPmemOnTT = resourceStatus.getTotalPhysicalMemory();
+    long reservedPmemOnTT = resourceStatus.getReservedPhysicalMemory();
+    long jobPMemForTask = job.getMaxPhysicalMemoryForTask();
+    long freePmemUsedOnTT = 0;
+
+    if (isSchedulingBasedOnPmemEnabled()) {
+      if (totalPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT
+          || reservedPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
+        return true;
+      }
+
+      if (reservedPmemOnTT > totalPmemOnTT) {
+        return true;
+      }
+
+      if (jobPMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
+        jobPMemForTask =
+            Math.round(jobVMemForTask * scheduler.defaultPercentOfPmemInVmem);
+      }
+
+      freePmemUsedOnTT = totalPmemOnTT - pmemUsedOnTT - reservedPmemOnTT;
+
+      if (jobPMemForTask > freePmemUsedOnTT) {
+        return false;
+      }
+    } else {
+      LOG.debug("One of the configuration parameters "
+          + "defaultPercentOfPmemInVmem and limitMaxPmemPerTasks is not "
+          + "configured. Scheduling based on job's physical memory "
+          + "requirements is disabled, ignoring any value set by job.");
+    }
+
+    LOG.debug("freeVMemOnTT = " + freeVmemUsedOnTT + " totalVMemOnTT = "
+        + totalVMemOnTT + " freePMemOnTT = " + freePmemUsedOnTT
+        + " totalPMemOnTT = " + totalPmemOnTT + " jobVMemForTask = "
+        + jobVMemForTask + " jobPMemForTask = " + jobPMemForTask);
+    return true;
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Dec  2 20:32:18 2008
@@ -31,13 +31,19 @@
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.conf.Configuration;
 
 
 public class TestCapacityScheduler extends TestCase {
-  
+
+  static final Log LOG =
+      LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
+
   private static int jobCounter;
   
   /**
@@ -147,8 +153,7 @@
       new HashSet<TaskInProgress>();
     
     public FakeJobInProgress(JobID jId, JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager, String user) 
-    throws IOException {
+        FakeTaskTrackerManager taskTrackerManager, String user) {
       super(jId, jobConf);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
@@ -165,6 +170,8 @@
       }
       mapTaskCtr = 0;
       redTaskCtr = 0;
+      super.setMaxVirtualMemoryForTask(jobConf.getMaxVirtualMemoryForTask());
+      super.setMaxPhysicalMemoryForTask(jobConf.getMaxPhysicalMemoryForTask());
     }
     
     @Override
@@ -301,20 +308,23 @@
       new HashMap<String, TaskTrackerStatus>();
     private Map<String, TaskStatus> taskStatuses = 
       new HashMap<String, TaskStatus>();
+    private Map<JobID, JobInProgress> jobs =
+        new HashMap<JobID, JobInProgress>();
 
     public FakeTaskTrackerManager() {
-      this(2, 1);
+      this(2, 2, 1);
     }
-    
-    public FakeTaskTrackerManager(int maxMapSlots, int maxReduceSlots) {
-      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
-          new ArrayList<TaskStatus>(), 0,
-          maxMapSlots, maxReduceSlots));
-      maxMapTasksPerTracker = maxMapSlots;
-      trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
-          new ArrayList<TaskStatus>(), 0,
-          maxMapSlots, maxReduceSlots));
-      maxReduceTasksPerTracker = maxReduceSlots;
+
+    public FakeTaskTrackerManager(int numTaskTrackers,
+        int maxMapTasksPerTracker, int maxReduceTasksPerTracker) {
+      this.maxMapTasksPerTracker = maxMapTasksPerTracker;
+      this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
+      for (int i = 1; i < numTaskTrackers + 1; i++) {
+        String ttName = "tt" + i;
+        trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", i,
+            new ArrayList<TaskStatus>(), 0, maxMapTasksPerTracker,
+            maxReduceTasksPerTracker));
+      }
     }
     
     public void addTaskTracker(String ttName) {
@@ -338,7 +348,23 @@
     public int getNextHeartbeatInterval() {
       return MRConstants.HEARTBEAT_INTERVAL_MIN;
     }
-    
+
+    @Override
+    public void killJob(JobID jobid) throws IOException {
+      JobInProgress job = jobs.get(jobid);
+      finalizeJob(job, JobStatus.KILLED);
+      job.kill();
+    }
+
+    @Override
+    public JobInProgress getJob(JobID jobid) {
+      return jobs.get(jobid);
+    }
+
+    Collection<JobInProgress> getJobs() {
+      return jobs.values();
+    }
+
     public Collection<TaskTrackerStatus> taskTrackers() {
       return trackers.values();
     }
@@ -352,7 +378,8 @@
       listeners.remove(listener);
     }
     
-    public void submitJob(JobInProgress job) {
+    public void submitJob(JobInProgress job) throws IOException {
+      jobs.put(job.getJobID(), job);
       for (JobInProgressListener listener : listeners) {
         listener.jobAdded(job);
       }
@@ -370,6 +397,11 @@
       }
       TaskStatus status = new TaskStatus() {
         @Override
+        public TaskAttemptID getTaskID() {
+          return t.getTaskID();
+        }
+
+        @Override
         public boolean getIsMap() {
           return t.isMapTask();
         }
@@ -393,9 +425,13 @@
     }
     
     void finalizeJob(FakeJobInProgress fjob) {
+      finalizeJob(fjob, JobStatus.SUCCEEDED);
+    }
+
+    void finalizeJob(JobInProgress fjob, int state) {
       // take a snapshot of the status before changing it
       JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
-      fjob.getStatus().setRunState(JobStatus.SUCCEEDED);
+      fjob.getStatus().setRunState(state);
       JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
       JobStatusChangeEvent event = 
         new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus, 
@@ -539,9 +575,16 @@
   private FakeClock clock;
 
   @Override
-  protected void setUp() throws Exception {
+  protected void setUp() {
+    setUp(2, 2, 1);
+  }
+
+  private void setUp(int numTaskTrackers, int numMapTasksPerTracker,
+      int numReduceTasksPerTracker) {
     jobCounter = 0;
-    taskTrackerManager = new FakeTaskTrackerManager();
+    taskTrackerManager =
+        new FakeTaskTrackerManager(numTaskTrackers, numMapTasksPerTracker,
+            numReduceTasksPerTracker);
     clock = new FakeClock();
     scheduler = new CapacityTaskScheduler(clock);
     scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -559,7 +602,24 @@
       scheduler.terminate();
     }
   }
-  
+
+  private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
+    FakeJobInProgress job =
+        new FakeJobInProgress(new JobID("test", ++jobCounter),
+            (jobConf == null ? new JobConf() : jobConf), taskTrackerManager,
+            jobConf.getUser());
+    job.getStatus().setRunState(state);
+    taskTrackerManager.submitJob(job);
+    return job;
+  }
+
+  private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
+      throws IOException {
+    FakeJobInProgress j = submitJob(state, jobConf);
+    scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+    return j;
+  }
+
   private FakeJobInProgress submitJob(int state, int maps, int reduces, 
       String queue, String user) throws IOException {
     JobConf jobConf = new JobConf(conf);
@@ -567,11 +627,8 @@
     jobConf.setNumReduceTasks(reduces);
     if (queue != null)
       jobConf.setQueueName(queue);
-    FakeJobInProgress job = new FakeJobInProgress(
-        new JobID("test", ++jobCounter), jobConf, taskTrackerManager, user);
-    job.getStatus().setRunState(state);
-    taskTrackerManager.submitJob(job);
-    return job;
+    jobConf.setUser(user);
+    return submitJob(state, jobConf);
   }
   
   // Submit a job and update the listeners
@@ -1285,6 +1342,344 @@
     assertEquals(schedulingInfo, schedulingInfo2);   
   }
 
+  /**
+   * Test to verify that highMemoryJobs are scheduled like all other jobs when
+   * memory-based scheduling is not enabled.
+   * @throws IOException
+   */
+  public void testDisabledMemoryBasedScheduling()
+      throws IOException {
+
+    LOG.debug("Starting the scheduler.");
+    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
+
+    // Limited TT - 1GB vmem and 512MB pmem
+    taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
+        .setTotalVirtualMemory(1 * 1024 * 1024 * 1024L);
+    taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
+        .setTotalPhysicalMemory(512 * 1024 * 1024L);
+
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // memory-based scheduling disabled by default.
+    scheduler.start();
+
+    LOG.debug("Submit one high memory(3GB vmem, 1GBpmem) job of 1 map task "
+        + "and 1 reduce task.");
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
+    jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    submitJobAndInit(JobStatus.RUNNING, jConf);
+
+    // assert that all tasks are launched even though they transgress the
+    // scheduling limits.
+
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+  }
+
+  /**
+   * Test to verify that highPmemJobs are scheduled like all other jobs when
+   * physical-memory based scheduling is not enabled.
+   * @throws IOException
+   */
+  public void testDisabledPmemBasedScheduling()
+      throws IOException {
+
+    LOG.debug("Starting the scheduler.");
+    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
+
+    // Limited TT - 100GB vmem and 500MB pmem
+    TaskTrackerStatus.ResourceStatus ttStatus =
+        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+    ttStatus.setTotalVirtualMemory(100 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(500 * 1024 * 1024L);
+    ttStatus.setReservedPhysicalMemory(0);
+
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enable vmem-based scheduling. pmem based scheduling disabled by default.
+    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        1536 * 1024 * 1024L);
+    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        3 * 1024 * 1024 * 1024L);
+    scheduler.start();
+
+    LOG.debug("Submit one high pmem(3GB vmem, 1GBpmem) job of 1 map task "
+        + "and 1 reduce task.");
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
+    jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    submitJobAndInit(JobStatus.RUNNING, jConf);
+
+    // assert that all tasks are launched even though they transgress the
+    // scheduling limits.
+
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+  }
+
+  /**
+   * Test HighMemoryJobs.
+   * @throws IOException
+   */
+  public void testHighMemoryJobs()
+      throws IOException {
+
+    LOG.debug("Starting the scheduler.");
+    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
+
+    TaskTrackerStatus.ResourceStatus ttStatus =
+        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedPhysicalMemory(0);
+    // Normal job on this TT would be 1.5GB vmem, 0.5GB pmem
+
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        1536 * 1024 * 1024L);
+    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        3 * 1024 * 1024 * 1024L);
+    resConf.setDefaultPercentOfPmemInVmem(33.3f);
+    resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    LOG.debug("Submit one high memory(1600MB vmem, 400MB pmem) job of "
+        + "1 map task and 1 reduce task.");
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(1600 * 1024 * 1024L); // 1.6GB vmem
+    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+
+    // No more tasks of this job can run on the TT because of lack of vmem
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+
+    // Let attempt_test_0001_m_000001_0 finish, task assignment should succeed.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+
+    LOG.debug("Submit another high memory(1200MB vmem, 800MB pmem) job of "
+        + "1 map task and 0 reduces.");
+    jConf.setMaxVirtualMemoryForTask(1200 * 1024 * 1024L);
+    jConf.setMaxPhysicalMemoryForTask(800 * 1024 * 1024L);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    submitJobAndInit(JobStatus.PREP, jConf); // job2
+
+    // This job shouldn't run the TT now because of lack of pmem
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+
+    // Let attempt_test_0001_m_000002_0 finish, task assignment should succeed.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job1);
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+
+    LOG.debug("Submit a normal memory(200MB vmem, 100MB pmem) job of "
+        + "0 maps and 1 reduce task.");
+    jConf.setMaxVirtualMemoryForTask(200 * 1024 * 1024L);
+    jConf.setMaxPhysicalMemoryForTask(100 * 1024 * 1024L);
+    jConf.setNumMapTasks(0);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    submitJobAndInit(JobStatus.PREP, jConf); // job3
+
+    checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
+  }
+
+  /**
+   * test invalid highMemoryJobs
+   * @throws IOException
+   */
+  public void testHighMemoryJobWithInvalidRequirements()
+      throws IOException {
+    LOG.debug("Starting the scheduler.");
+    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
+    TaskTrackerStatus.ResourceStatus ttStatus =
+        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024);
+    ttStatus.setReservedPhysicalMemory(0);
+
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf.setFakeQueues(queues);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    long vmemUpperLimit = 1 * 1024 * 1024 * 1024L;
+    long vmemDefault = 1536 * 1024 * 1024L;
+    long pmemUpperLimit = vmemUpperLimit;
+    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        vmemDefault);
+    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        vmemUpperLimit);
+    resConf.setDefaultPercentOfPmemInVmem(33.3f);
+    resConf.setLimitMaxPmemForTasks(pmemUpperLimit);
+    scheduler.setResourceManagerConf(resConf);
+    ControlledInitializationPoller p = new ControlledInitializationPoller(
+        scheduler.jobQueuesManager,
+        resConf,
+        resConf.getQueues());
+    scheduler.setInitializationPoller(p);
+    scheduler.start();
+
+    LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of "
+        + "1 map, 0 reduce tasks.");
+    long jobMaxVmem = 5 * 1024 * 1024 * 1024L;
+    long jobMaxPmem = 3 * 1024 * 1024 * 1024L;
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(jobMaxVmem);
+    jConf.setMaxPhysicalMemoryForTask(jobMaxPmem);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+
+    boolean throwsException = false;
+    String msg = null;
+    FakeJobInProgress job;
+    try {
+      job = submitJob(JobStatus.PREP, jConf);
+    } catch (IOException ioe) {
+      // job has to fail
+      throwsException = true;
+      msg = ioe.getMessage();
+    }
+
+    assertTrue(throwsException);
+    job = (FakeJobInProgress) taskTrackerManager.getJobs().toArray()[0];
+    assertTrue(msg.matches(job.getJobID() + " \\(" + jobMaxVmem + "vmem, "
+        + jobMaxPmem + "pmem\\) exceeds the cluster's max-memory-limits \\("
+        + vmemUpperLimit + "vmem, " + pmemUpperLimit
+        + "pmem\\). Cannot run in this cluster, so killing it."));
+    // For job, no cleanup task needed so gets killed immediately.
+    assertTrue(job.getStatus().getRunState() == JobStatus.KILLED);
+  }
+
+  /**
+   * Test blocking of cluster for lack of memory.
+   * @throws IOException
+   */
+  public void testClusterBlockingForLackOfMemory()
+      throws IOException {
+
+    LOG.debug("Starting the scheduler.");
+    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
+    TaskTrackerStatus.ResourceStatus ttStatus =
+        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedPhysicalMemory(0);
+
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf.setFakeQueues(queues);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        1536 * 1024 * 1024L);
+    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        4 * 1024 * 1024 * 1024L);
+    resConf.setDefaultPercentOfPmemInVmem(33.3f);
+    resConf.setLimitMaxPmemForTasks(2 * 1024 * 1024 * 1024L);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    LOG.debug("Submit one high memory(4GB vmem, 512MB pmem) job of "
+        + "1 map, 0 reduce tasks.");
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(4 * 1024 * 1024 * 1024L);
+    jConf.setMaxPhysicalMemoryForTask(512 * 1024 * 1024L);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+    // TTs should not run these jobs i.e. cluster blocked because of lack of
+    // vmem
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+
+    // Job should still be alive
+    assertTrue(job1.getStatus().getRunState() == JobStatus.RUNNING);
+
+    LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
+    // Use cluster-wide defaults
+    jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+    jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // cluster should still be blocked for job1 and so even job2 should not run
+    // even though it is a normal job
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+
+    scheduler.taskTrackerManager.killJob(job2.getJobID());
+    scheduler.taskTrackerManager.killJob(job1.getJobID());
+
+    LOG.debug("Submit one high memory(2GB vmem, 2GB pmem) job of "
+        + "1 map, 0 reduce tasks.");
+    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L);
+    jConf.setMaxPhysicalMemoryForTask(2 * 1024 * 1024 * 1024L);
+    FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
+    // TTs should not run these jobs i.e. cluster blocked because of lack of
+    // pmem now.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    
+    // Job should still be alive
+    assertTrue(job3.getStatus().getRunState() == JobStatus.RUNNING);
+
+    LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
+    // Use cluster-wide defaults
+    jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+    jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+    submitJobAndInit(JobStatus.PREP, jConf); // job4
+
+    // cluster should still be blocked for job3 and so even job4 should not run
+    // even though it is a normal job
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+
   protected TaskTrackerStatus tracker(String taskTrackerName) {
     return taskTrackerManager.getTaskTracker(taskTrackerName);
   }
@@ -1311,7 +1706,7 @@
   public void testJobInitialization() throws Exception {
     // set up the scheduler
     String[] qs = { "default" };
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1);
+    taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     taskTrackerManager.addQueues(qs);
     resConf = new FakeResourceManagerConf();
@@ -1580,5 +1975,4 @@
     return userJobs;
 
   }
-  
 }

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Tue Dec  2 20:32:18 2008
@@ -161,10 +161,20 @@
     public int getNextHeartbeatInterval() {
       return MRConstants.HEARTBEAT_INTERVAL_MIN;
     }
-    
+
+    @Override
+    public void killJob(JobID jobid) {
+      return;
+    }
+
+    @Override
+    public JobInProgress getJob(JobID jobid) {
+      return null;
+    }
+
     // Test methods
     
-    public void submitJob(JobInProgress job) {
+    public void submitJob(JobInProgress job) throws IOException {
       for (JobInProgressListener listener : listeners) {
         listener.jobAdded(job);
       }

Added: hadoop/core/trunk/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java?rev=722760&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java Tue Dec  2 20:32:18 2008
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Plugin to calculate virtual and physical memories on Linux systems.
+ */
+public class LinuxMemoryCalculatorPlugin extends MemoryCalculatorPlugin {
+  private static final Log LOG =
+      LogFactory.getLog(LinuxMemoryCalculatorPlugin.class);
+
+  /**
+   * proc's meminfo virtual file has keys-values in the format
+   * "key:[ \t]*value[ \t]kB".
+   */
+  private static final String PROCFS_MEMFILE = "/proc/meminfo";
+  private static final Pattern PROCFS_MEMFILE_FORMAT =
+      Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB");
+
+  // We just need the values for the keys MemTotal and SwapTotal
+  private static final String MEMTOTAL_STRING = "MemTotal";
+  private static final String SWAPTOTAL_STRING = "SwapTotal";
+
+  private long ramSize = 0;
+  private long swapSize = 0;
+
+  boolean readMemInfoFile = false;
+
+  private void readProcMemInfoFile() {
+
+    if (readMemInfoFile) {
+      return;
+    }
+
+    // Read "/proc/memInfo" file
+    BufferedReader in = null;
+    FileReader fReader = null;
+    try {
+      fReader = new FileReader(PROCFS_MEMFILE);
+      in = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      // shouldn't happen....
+      return;
+    }
+
+    Matcher mat = null;
+
+    try {
+      String str = in.readLine();
+      while (str != null) {
+        mat = PROCFS_MEMFILE_FORMAT.matcher(str);
+        if (mat.find()) {
+          if (mat.group(1).equals(MEMTOTAL_STRING)) {
+            ramSize = Long.parseLong(mat.group(2));
+          } else if (mat.group(1).equals(SWAPTOTAL_STRING)) {
+            swapSize = Long.parseLong(mat.group(2));
+          }
+        }
+        str = in.readLine();
+      }
+    } catch (IOException io) {
+      LOG.warn("Error reading the stream " + io);
+    } finally {
+      // Close the streams
+      try {
+        fReader.close();
+        try {
+          in.close();
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + in);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+
+    readMemInfoFile = true;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getPhysicalMemorySize() {
+    readProcMemInfoFile();
+    return ramSize * 1024;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getVirtualMemorySize() {
+    readProcMemInfoFile();
+    return (ramSize + swapSize) * 1024;
+  }
+
+  /**
+   * Test the {@link LinuxMemoryCalculatorPlugin}
+   * 
+   * @param args
+   */
+  public static void main(String[] args) {
+    LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
+    System.out.println("Physical memory Size(bytes) : "
+        + plugin.getPhysicalMemorySize());
+    System.out.println("Total Virtual memory Size(bytes) : "
+        + plugin.getVirtualMemorySize());
+  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java?rev=722760&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java Tue Dec  2 20:32:18 2008
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+/**
+ * Plugin to calculate virtual and physical memories on the system.
+ * 
+ */
+public abstract class MemoryCalculatorPlugin extends Configured {
+
+  /**
+   * Obtain the total size of the virtual memory present in the system.
+   * 
+   * @return virtual memory size in bytes.
+   */
+  public abstract long getVirtualMemorySize();
+
+  /**
+   * Obtain the total size of the physical memory present in the system.
+   * 
+   * @return physical memory size bytes.
+   */
+  public abstract long getPhysicalMemorySize();
+
+  /**
+   * Get the MemoryCalculatorPlugin from the class name and configure it. If
+   * class name is null, this method will try and return a memory calculator
+   * plugin available for this system.
+   * 
+   * @param clazz class-name
+   * @param conf configure the plugin with this.
+   * @return MemoryCalculatorPlugin
+   */
+  public static MemoryCalculatorPlugin getMemoryCalculatorPlugin(
+      Class<? extends MemoryCalculatorPlugin> clazz, Configuration conf) {
+
+    if (clazz != null) {
+      return ReflectionUtils.newInstance(clazz, conf);
+    }
+
+    // No class given, try a os specific class
+    try {
+      String osName = System.getProperty("os.name");
+      if (osName.startsWith("Linux")) {
+        return new LinuxMemoryCalculatorPlugin();
+      }
+    } catch (SecurityException se) {
+      // Failed to get Operating System name.
+      return null;
+    }
+
+    // Not supported on this system.
+    return null;
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java Tue Dec  2 20:32:18 2008
@@ -188,7 +188,7 @@
    * Get the cumulative virtual memory used by all the processes in the
    * process-tree.
    * 
-   * @return cumulative virtual memory used by the process-tree in kilobytes.
+   * @return cumulative virtual memory used by the process-tree in bytes.
    */
   public long getCumulativeVmem() {
     long total = 0;
@@ -197,7 +197,7 @@
         total += p.getVmem();
       }
     }
-    return total/1024;
+    return total;
   }
 
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Tue Dec  2 20:32:18 2008
@@ -52,8 +52,11 @@
                  so that the TaskTracker can synchronize itself.
    * Version 20: Changed status message due to changes in TaskStatus
    *             (HADOOP-4232)
+   * Version 21: Changed information reported in TaskTrackerStatus'
+   *             ResourceStatus and the corresponding accessor methods
+   *             (HADOOP-4035)
    */
-  public static final long versionID = 20L;
+  public static final long versionID = 21L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Tue Dec  2 20:32:18 2008
@@ -108,14 +108,120 @@
    * A value which if set for memory related configuration options,
    * indicates that the options are turned off.
    */
-  static final long DISABLED_VIRTUAL_MEMORY_LIMIT = -1L;
+  public static final long DISABLED_MEMORY_LIMIT = -1L;
   
   /**
    * Name of the queue to which jobs will be submitted, if no queue
    * name is mentioned.
    */
   public static final String DEFAULT_QUEUE_NAME = "default";
-  
+
+  /**
+   * Cluster-wide configuration to be set by the administrators that provides
+   * default amount of maximum virtual memory for job's tasks. This has to be
+   * set on both the JobTracker node for the sake of scheduling decisions and on
+   * the TaskTracker nodes for the sake of memory management.
+   * 
+   * <p>
+   * 
+   * If a job doesn't specify its virtual memory requirement by setting
+   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to {@link #DISABLED_MEMORY_LIMIT},
+   * tasks are assured a memory limit set to this property. This property is
+   * disabled by default, and if not explicitly set to a valid value by the
+   * administrators and if a job doesn't specify its virtual memory
+   * requirements, the job's tasks will not be assured anything and may be
+   * killed by a TT that intends to control the total memory usage of the tasks
+   * via memory management functionality.
+   * 
+   * <p>
+   * 
+   * This value should in general be less than the cluster-wide configuration
+   * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} . If not or if it not set,
+   * TaskTracker's memory management may be disabled and a scheduler's memory
+   * based scheduling decisions will be affected. Please refer to the
+   * documentation of the configured scheduler to see how this property is used.
+   */
+  public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
+      "mapred.task.default.maxvmem";
+
+  /**
+   * The maximum amount of memory any task of this job will use.
+   * 
+   * <p>
+   * 
+   * This value will be used by TaskTrackers for monitoring the memory usage of
+   * tasks of this jobs. If a TaskTracker's memory management functionality is
+   * enabled, each task of this job will be allowed to use a maximum virtual
+   * memory specified by this property. If the task's memory usage goes over
+   * this value, the task will be failed by the TT. If not set, the cluster-wide
+   * configuration {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is used as the
+   * default value for memory requirements. If this property cascaded with
+   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} becomes equal to -1, job's
+   * tasks will not be assured anything and may be killed by a TT that intends
+   * to control the total memory usage of the tasks via memory management
+   * functionality. If the memory management functionality is disabled on a TT,
+   * this value is ignored.
+   * 
+   * <p>
+   * 
+   * This value should also be not more than the cluster-wide configuration
+   * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} which has to be set by the site
+   * administrators.
+   * 
+   * <p>
+   * 
+   * This value may be used by schedulers that support scheduling based on job's
+   * memory requirements. In general, a task of this job will be scheduled on a
+   * TaskTracker only if the amount of virtual memory still unoccupied on the
+   * TaskTracker is greater than or equal to this value. But different
+   * schedulers can take different decisions. Please refer to the documentation
+   * of the scheduler being configured to see if it does memory based scheduling
+   * and if it does, how this property is used by that scheduler.
+   * 
+   * @see #setMaxVirtualMemoryForTask(long)
+   * @see #getMaxVirtualMemoryForTask()
+   */
+  public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
+      "mapred.task.maxvmem";
+
+  /**
+   * The maximum amount of physical memory any task of a job will use.
+   * 
+   * <p>
+   * 
+   * This value may be used by schedulers that support scheduling based on job's
+   * memory requirements. In general, a task of this job will be scheduled on a
+   * TaskTracker, only if the amount of physical memory still unoccupied on the
+   * TaskTracker is greater than or equal to this value. But different
+   * schedulers can take different decisions. Please refer to the documentation
+   * of the scheduler being configured to see how it does memory based
+   * scheduling and how this variable is used by that scheduler.
+   * 
+   * @see #setMaxPhysicalMemoryForTask(long)
+   * @see #getMaxPhysicalMemoryForTask()
+   */
+  public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
+      "mapred.task.maxpmem";
+
+  /**
+   * Cluster-wide configuration to be set by the site administrators that
+   * provides an upper limit on the maximum virtual memory that can be specified
+   * by a job. The job configuration {@link #MAPRED_TASK_MAXVMEM_PROPERTY} and
+   * the cluster-wide configuration
+   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} should, by definition, be
+   * less than this value. If the job configuration
+   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is more than this value,
+   * depending on the scheduler being configured, the job may be rejected or the
+   * job configuration may just be ignored.
+   * 
+   * <p>
+   * 
+   * If it is not set on a TaskTracker, TaskTracker's memory management will be
+   * disabled.
+   */
+  public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
+      "mapred.task.limit.maxvmem";
+
   /**
    * Construct a map/reduce job configuration.
    */
@@ -1346,35 +1452,53 @@
   }
   
   /**
-   * The maximum amount of memory any task of this job will use.
+   * The maximum amount of memory any task of this job will use. See
+   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
    * 
-   * A task of this job will be scheduled on a tasktracker, only if the
-   * amount of free memory on the tasktracker is greater than 
-   * or equal to this value.
+   * @return The maximum amount of memory any task of this job will use, in
+   *         bytes.
+   * @see #setMaxVirtualMemoryForTask(long)
+   */
+  public long getMaxVirtualMemoryForTask() {
+    return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+  }
+
+  /**
+   * Set the maximum amount of memory any task of this job can use. See
+   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
    * 
-   * If set to {@link #DISABLED_VIRTUAL_MEMORY_LIMIT}, tasks are assured 
-   * a memory limit set to mapred.task.default.maxmemory. If the value of
-   * mapred.tasktracker.tasks.maxmemory is set to -1, this value is 
-   * ignored.
+   * @param vmem Maximum amount of virtual memory in bytes any task of this job
+   *          can use.
+   * @see #getMaxVirtualMemoryForTask()
+   */
+  public void setMaxVirtualMemoryForTask(long vmem) {
+    setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, vmem);
+  }
+
+  /**
+   * The maximum amount of physical memory any task of this job will use. See
+   * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
    * 
-   * @return The maximum amount of memory any task of this job will use, in kilobytes.
-   * @see #getMaxVirtualMemoryForTasks()
+   * @return The maximum amount of physical memory any task of this job will
+   *         use, in bytes.
+   * @see #setMaxPhysicalMemoryForTask(long)
    */
-  long getMaxVirtualMemoryForTask() {
-    return getLong("mapred.task.maxmemory", DISABLED_VIRTUAL_MEMORY_LIMIT);
+  public long getMaxPhysicalMemoryForTask() {
+    return getLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
   }
-  
+
   /**
-   * Set the maximum amount of memory any task of this job can use.
+   * Set the maximum amount of physical memory any task of this job can use. See
+   * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
    * 
-   * @param vmem Maximum amount of memory in kilobytes any task of this job 
-   * can use.
-   * @see #getMaxVirtualMemoryForTask()
+   * @param pmem Maximum amount of physical memory in bytes any task of this job
+   *          can use.
+   * @see #getMaxPhysicalMemoryForTask()
    */
-  void setMaxVirtualMemoryForTask(long vmem) {
-    setLong("mapred.task.maxmemory", vmem);
+  public void setMaxPhysicalMemoryForTask(long pmem) {
+    setLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, pmem);
   }
-  
+
   /**
    * Return the name of the queue to which this job is submitted.
    * Defaults to 'default'.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Dec  2 20:32:18 2008
@@ -150,6 +150,7 @@
   private boolean hasSpeculativeReduces;
   private long inputLength = 0;
   private long maxVirtualMemoryForTask;
+  private long maxPhysicalMemoryForTask;
   
   // Per-job counters
   public static enum Counter { 
@@ -245,7 +246,8 @@
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
-    this.maxVirtualMemoryForTask = conf.getMaxVirtualMemoryForTask();
+    setMaxVirtualMemoryForTask(conf.getMaxVirtualMemoryForTask());
+    setMaxPhysicalMemoryForTask(conf.getMaxPhysicalMemoryForTask());
   }
 
   /**
@@ -349,6 +351,8 @@
       jobInitKillStatus.initStarted = true;
     }
 
+    LOG.debug("initializing " + this.jobId);
+
     // log job info
     JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
                                     this.startTime);
@@ -533,10 +537,22 @@
   }
 
   // Accessors for resources.
-  public long getMaxVirtualMemoryForTask() {
+  long getMaxVirtualMemoryForTask() {
     return maxVirtualMemoryForTask;
   }
-  
+
+  void setMaxVirtualMemoryForTask(long maxVMem) {
+    maxVirtualMemoryForTask = maxVMem;
+  }
+
+  long getMaxPhysicalMemoryForTask() {
+    return maxPhysicalMemoryForTask;
+  }
+
+  void setMaxPhysicalMemoryForTask(long maxPMem) {
+    maxPhysicalMemoryForTask = maxPMem;
+  }
+
   // Update the job start/launch time (upon restart) and log to history
   synchronized void updateJobInfo(long startTime, long launchTime, int count) {
     // log and change to the job's start/launch time

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java Tue Dec  2 20:32:18 2008
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
+
 /**
  * A listener for changes in a {@link JobInProgress job}'s lifecycle in the
  * {@link JobTracker}.
@@ -26,8 +28,9 @@
   /**
    * Invoked when a new job has been added to the {@link JobTracker}.
    * @param job The added job.
+   * @throws IOException 
    */
-  public abstract void jobAdded(JobInProgress job);
+  public abstract void jobAdded(JobInProgress job) throws IOException;
 
   /**
    * Invoked when a job has been removed from the {@link JobTracker}.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Dec  2 20:32:18 2008
@@ -2247,7 +2247,12 @@
       synchronized (taskScheduler) {
         jobs.put(job.getProfile().getJobID(), job);
         for (JobInProgressListener listener : jobInProgressListeners) {
-          listener.jobAdded(job);
+          try {
+            listener.jobAdded(job);
+          } catch (IOException ioe) {
+            LOG.warn("Failed to add and so skipping the job : "
+                + job.getJobID() + ". Exception : " + ioe);
+          }
         }
       }
     }



Mime
View raw message