hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r722760 [2/2] - in /hadoop/core/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/...
Date Wed, 03 Dec 2008 04:32:19 GMT
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java Tue Dec  2 20:32:18
2008
@@ -291,7 +291,7 @@
       if (tracker.isTaskMemoryManagerEnabled()) {
         tracker.getTaskMemoryManager().addTask(
             TaskAttemptID.forName(env.conf.get("mapred.task.id")),
-            tracker.getMemoryForTask(env.conf));
+            tracker.getVirtualMemoryForTask(env.conf));
       }
       setRunningTaskForJvm(jvmRunner.jvmId, t);
       LOG.info(jvmRunner.getName());

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue
Dec  2 20:32:18 2008
@@ -60,7 +60,9 @@
     tasksToBeAdded = new HashMap<TaskAttemptID, ProcessTreeInfo>();
     tasksToBeRemoved = new ArrayList<TaskAttemptID>();
 
-    maxMemoryAllowedForAllTasks = taskTracker.getMaxVirtualMemoryForTasks();
+    maxMemoryAllowedForAllTasks =
+        taskTracker.getTotalVirtualMemoryOnTT()
+            - taskTracker.getReservedVirtualMemory();
 
     monitoringInterval = taskTracker.getJobConf().getLong(
         "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
@@ -72,9 +74,6 @@
   public void addTask(TaskAttemptID tid, long memLimit) {
     synchronized (tasksToBeAdded) {
       LOG.debug("Tracking ProcessTree " + tid + " for the first time");
-      // TODO: Negative values must have been checked in JobConf.
-      memLimit = (memLimit < 0 ? JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT
-          : memLimit);
       ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit,
           sleepTimeBeforeSigKill);
       tasksToBeAdded.put(tid, ptInfo);
@@ -203,15 +202,25 @@
         long currentMemUsage = pTree.getCumulativeVmem();
         long limit = ptInfo.getMemLimit();
         LOG.info("Memory usage of ProcessTree " + pId + " :" + currentMemUsage
-            + "kB. Limit : " + limit + "kB");
+            + "bytes. Limit : " + limit + "bytes");
 
-        if (limit != JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT
+        if (limit > taskTracker.getLimitMaxVMemPerTask()) {
+          // TODO: With monitoring enabled and no scheduling based on
+          // memory,users can seriously hijack the system by specifying memory
+          // requirements well above the cluster wide limit. Ideally these jobs
+          // should have been rejected by JT/scheduler. Because we can't do
+          // that, in the minimum we should fail the tasks and hence the job.
+          LOG.warn("Task " + tid
+              + " 's maxVmemPerTask is greater than TT's limitMaxVmPerTask");
+        }
+
+        if (limit != JobConf.DISABLED_MEMORY_LIMIT
             && currentMemUsage > limit) {
           // Task (the root process) is still alive and overflowing memory.
           // Clean up.
           String msg = "TaskTree [pid=" + pId + ",tipID=" + tid
               + "] is running beyond memory-limits. Current usage : "
-              + currentMemUsage + "kB. Limit : " + limit + "kB. Killing task.";
+              + currentMemUsage + "bytes. Limit : " + limit + "bytes. Killing task.";
           LOG.warn(msg);
           taskTracker.cleanUpOverMemoryTask(tid, true, msg);
 
@@ -227,7 +236,7 @@
       }
 
       LOG.debug("Memory still in usage across all tasks : " + memoryStillInUsage
-          + "kB. Total limit : " + maxMemoryAllowedForAllTasks);
+          + "bytes. Total limit : " + maxMemoryAllowedForAllTasks);
 
       if (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
         LOG.warn("The total memory usage is still overflowing TTs limits."

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Dec  2 20:32:18
2008
@@ -76,6 +76,7 @@
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
@@ -185,10 +186,64 @@
   volatile JvmManager jvmManager;
   
   private TaskMemoryManagerThread taskMemoryManager;
-  private boolean taskMemoryManagerEnabled = false;
-  private long maxVirtualMemoryForTasks 
-                                    = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
-  
+  private boolean taskMemoryManagerEnabled = true;
+  private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
+  private long reservedPmem = JobConf.DISABLED_MEMORY_LIMIT;
+
+  // Cluster wide default value for max-vm per task
+  private long defaultMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
+  // Cluster wide upper limit on max-vm per task
+  private long limitMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
+
+  /**
+   * Configuration property to specify the amount of virtual memory that has to
+   * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
+   * virtual memory should be a part of the total virtual memory available on
+   * the TaskTracker. TaskTracker obtains the total virtual memory available on
+   * the system by using a {@link MemoryCalculatorPlugin}. The total physical
+   * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
+   * MemoryCalculatorPlugin implementation.
+   * 
+   * <p>
+   * 
+   * The reserved virtual memory and the total virtual memory values are
+   * reported by the TaskTracker as part of heart-beat so that they can
+   * considered by a scheduler.
+   * 
+   * <p>
+   * 
+   * These two values are also used by the TaskTracker for tracking tasks'
+   * memory usage. Memory management functionality on a TaskTracker is disabled
+   * if this property is not set, if it more than the total virtual memory
+   * reported by MemoryCalculatorPlugin, or if either of the values is negative.
+   */
+  static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY =
+      "mapred.tasktracker.vmem.reserved";
+
+  /**
+   * Configuration property to specify the amount of physical memory that has to
+   * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
+   * physical memory should be a part of the total physical memory available on
+   * the TaskTracker. TaskTracker obtains the total physical memory available on
+   * the system by using a {@link MemoryCalculatorPlugin}. The total physical
+   * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
+   * MemoryCalculatorPlugin implementation.
+   * 
+   * <p>
+   * 
+   * The reserved virtual memory and the total virtual memory values are
+   * reported by the TaskTracker as part of heart-beat so that they can
+   * considered by a scheduler.
+   * 
+   */
+  static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
+      "mapred.tasktracker.pmem.reserved";
+
+  static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
+      "mapred.tasktracker.memory_calculator_plugin";
+
   /**
    * the minimum interval between jobtracker polls
    */
@@ -460,17 +515,11 @@
                              "Map-events fetcher for all reduce tasks " + "on " + 
                              taskTrackerName);
     mapEventsFetcher.start();
-    maxVirtualMemoryForTasks = fConf.
-                                  getLong("mapred.tasktracker.tasks.maxmemory",
-                                          JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT);
+
+    initializeMemoryManagement();
+
     this.indexCache = new IndexCache(this.fConf);
-    // start the taskMemoryManager thread only if enabled
-    setTaskMemoryManagerEnabledFlag();
-    if (isTaskMemoryManagerEnabled()) {
-      taskMemoryManager = new TaskMemoryManagerThread(this);
-      taskMemoryManager.setDaemon(true);
-      taskMemoryManager.start();
-    }
+
     mapLauncher = new TaskLauncher(maxCurrentMapTasks);
     reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
     mapLauncher.start();
@@ -1139,12 +1188,17 @@
     if (askForNewTask) {
       checkLocalDirs(fConf.getLocalDirs());
       askForNewTask = enoughFreeSpace(localMinSpaceStart);
-      status.getResourceStatus().setAvailableSpace( getFreeSpace() );
-      long freeVirtualMem = findFreeVirtualMemory();
-      LOG.debug("Setting amount of free virtual memory for the new task: " +
-                    freeVirtualMem);
-      status.getResourceStatus().setFreeVirtualMemory(freeVirtualMem);
-      status.getResourceStatus().setTotalMemory(maxVirtualMemoryForTasks);
+      long freeDiskSpace = getFreeSpace();
+      long totVmem = getTotalVirtualMemoryOnTT();
+      long totPmem = getTotalPhysicalMemoryOnTT();
+      long rsrvdVmem = getReservedVirtualMemory();
+      long rsrvdPmem = getReservedPhysicalMemory();
+
+      status.getResourceStatus().setAvailableSpace(freeDiskSpace);
+      status.getResourceStatus().setTotalVirtualMemory(totVmem);
+      status.getResourceStatus().setTotalPhysicalMemory(totPmem);
+      status.getResourceStatus().setReservedVirtualMemory(rsrvdVmem);
+      status.getResourceStatus().setReservedPhysicalMemory(rsrvdPmem);
     }
       
     //
@@ -1192,67 +1246,67 @@
   }
 
   /**
-   * Return the maximum amount of memory available for all tasks on 
-   * this tracker
-   * @return maximum amount of virtual memory
+   * Return the total virtual memory available on this TaskTracker.
+   * @return total size of virtual memory.
    */
-  long getMaxVirtualMemoryForTasks() {
-    return maxVirtualMemoryForTasks;
+  long getTotalVirtualMemoryOnTT() {
+    return totalVirtualMemoryOnTT;
   }
-  
+
   /**
-   * Find the minimum amount of virtual memory that would be
-   * available for a new task.
-   * 
-   * The minimum amount of virtual memory is computed by looking
-   * at the maximum amount of virtual memory that is allowed for
-   * all tasks in the system, as per mapred.tasktracker.tasks.maxmemory,
-   * and the total amount of maximum virtual memory that can be
-   * used by all currently running tasks.
-   * 
-   * @return amount of free virtual memory that can be assured for
-   * new tasks
+   * Return the total physical memory available on this TaskTracker.
+   * @return total size of physical memory.
    */
-  private synchronized long findFreeVirtualMemory() {
-  
-    if (maxVirtualMemoryForTasks == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
-      // this will disable picking up tasks based on free memory.
-      return JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
-    }
-  
-    long maxMemoryUsed = 0L;
-    for (TaskInProgress tip: runningTasks.values()) {
-      // the following task states are one in which the slot is
-      // still occupied and hence memory of the task should be
-      // accounted in used memory.
-      if ((tip.getRunState() == TaskStatus.State.RUNNING)
-            || (tip.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
-        maxMemoryUsed += getMemoryForTask(tip.getJobConf());
-      }
-    }
-  
-    return (maxVirtualMemoryForTasks - maxMemoryUsed);
+  long getTotalPhysicalMemoryOnTT() {
+    return totalPmemOnTT;
+  }
+
+  /**
+   * Return the amount of virtual memory reserved on the TaskTracker for system
+   * usage (OS, TT etc).
+   */
+  long getReservedVirtualMemory() {
+    return reservedVirtualMemory;
+  }
+
+  /**
+   * Return the amount of physical memory reserved on the TaskTracker for system
+   * usage (OS, TT etc).
+   */
+  long getReservedPhysicalMemory() {
+    return reservedPmem;
   }
 
   /**
-   * Return the memory allocated for a TIP.
+   * Return the limit on the maxVMemPerTask on this TaskTracker
+   * @return limitMaxVmPerTask
+   */
+  long getLimitMaxVMemPerTask() {
+    return limitMaxVmPerTask;
+  }
+
+  /**
+   * Obtain the virtual memory allocated for a TIP.
+   * 
+   * If the TIP's job has a configured value for the max-virtual memory, that
+   * will be returned. Else, the cluster-wide default maxvirtual memory for
+   * tasks is returned.
    * 
-   * If the TIP's job has a configured value for the max memory that is
-   * returned. Else, the default memory that would be assigned for the
-   * task is returned.
    * @param conf
-   * @return the memory allocated for the TIP.
+   * @return the virtual memory allocated for the TIP.
    */
-  long getMemoryForTask(JobConf conf) {
-    long memForTask = conf.getMaxVirtualMemoryForTask();
-    if (memForTask == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
-      memForTask = fConf.getLong("mapred.task.default.maxmemory",
-                          512*1024*1024L);
+  long getVirtualMemoryForTask(JobConf conf) {
+    long vMemForTask =
+        normalizeMemoryConfigValue(conf.getMaxVirtualMemoryForTask());
+    if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      vMemForTask =
+          normalizeMemoryConfigValue(fConf.getLong(
+              JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+              JobConf.DISABLED_MEMORY_LIMIT));
     }
-    return memForTask;
-  }  
-  
-  
+    return vMemForTask;
+  }
+
   /**
    * Check if the jobtracker directed a 'reset' of the tasktracker.
    * 
@@ -1634,7 +1688,7 @@
       localizeJob(tip);
       if (isTaskMemoryManagerEnabled()) {
         taskMemoryManager.addTask(tip.getTask().getTaskID(), 
-            getMemoryForTask(tip.getJobConf()));
+            getVirtualMemoryForTask(tip.getJobConf()));
       }
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
@@ -2931,6 +2985,75 @@
     return taskMemoryManager;
   }
 
+  /**
+   * Normalize the negative values in configuration
+   * 
+   * @param val
+   * @return normalized val
+   */
+  private long normalizeMemoryConfigValue(long val) {
+    if (val < 0) {
+      val = JobConf.DISABLED_MEMORY_LIMIT;
+    }
+    return val;
+  }
+
+  /**
+   * Memory-related setup
+   */
+  private void initializeMemoryManagement() {
+    Class<? extends MemoryCalculatorPlugin> clazz =
+        fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+            null, MemoryCalculatorPlugin.class);
+    MemoryCalculatorPlugin memoryCalculatorPlugin =
+        (MemoryCalculatorPlugin) MemoryCalculatorPlugin
+            .getMemoryCalculatorPlugin(clazz, fConf);
+    LOG.info(" Using MemoryCalculatorPlugin : " + memoryCalculatorPlugin);
+
+    if (memoryCalculatorPlugin != null) {
+      totalVirtualMemoryOnTT = memoryCalculatorPlugin.getVirtualMemorySize();
+      if (totalVirtualMemoryOnTT <= 0) {
+        LOG.warn("TaskTracker's totalVmem could not be calculated. "
+            + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
+        totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+      }
+      totalPmemOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
+      if (totalPmemOnTT <= 0) {
+        LOG.warn("TaskTracker's totalPmem could not be calculated. "
+            + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
+        totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+      }
+    }
+
+    reservedVirtualMemory =
+        normalizeMemoryConfigValue(fConf.getLong(
+            TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    reservedPmem =
+        normalizeMemoryConfigValue(fConf.getLong(
+            TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    defaultMaxVmPerTask =
+        normalizeMemoryConfigValue(fConf.getLong(
+            JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    limitMaxVmPerTask =
+        normalizeMemoryConfigValue(fConf.getLong(
+            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    // start the taskMemoryManager thread only if enabled
+    setTaskMemoryManagerEnabledFlag();
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager = new TaskMemoryManagerThread(this);
+      taskMemoryManager.setDaemon(true);
+      taskMemoryManager.start();
+    }
+  }
+
   private void setTaskMemoryManagerEnabledFlag() {
     if (!ProcfsBasedProcessTree.isAvailable()) {
       LOG.info("ProcessTree implementation is missing on this system. "
@@ -2939,13 +3062,55 @@
       return;
     }
 
-    Long tasksMaxMem = getMaxVirtualMemoryForTasks();
-    if (tasksMaxMem == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
-      LOG.info("TaskTracker's tasksMaxMem is not set. TaskMemoryManager is "
-          + "disabled.");
+    // /// Missing configuration
+    StringBuilder mesg = new StringBuilder();
+
+    long totalVmemOnTT = getTotalVirtualMemoryOnTT();
+    if (totalVmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
+      mesg.append("TaskTracker's totalVmem could not be calculated.\n");
+      taskMemoryManagerEnabled = false;
+    }
+
+    long reservedVmem = getReservedVirtualMemory();
+    if (reservedVmem == JobConf.DISABLED_MEMORY_LIMIT) {
+      mesg.append("TaskTracker's reservedVmem is not configured.\n");
+      taskMemoryManagerEnabled = false;
+    }
+
+    if (defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      mesg.append("TaskTracker's defaultMaxVmPerTask is not configured.\n");
+      taskMemoryManagerEnabled = false;
+    }
+
+    if (limitMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      mesg.append("TaskTracker's limitMaxVmPerTask is not configured.\n");
+      taskMemoryManagerEnabled = false;
+    }
+
+    if (!taskMemoryManagerEnabled) {
+      LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
+      return;
+    }
+    // ///// End of missing configuration
+
+    // ///// Mis-configuration
+    if (defaultMaxVmPerTask > limitMaxVmPerTask) {
+      mesg.append("defaultMaxVmPerTask is mis-configured. "
+          + "It shouldn't be greater than limitMaxVmPerTask. ");
       taskMemoryManagerEnabled = false;
+    }
+
+    if (reservedVmem > totalVmemOnTT) {
+      mesg.append("reservedVmemOnTT is mis-configured. "
+          + "It shouldn't be greater than totalVmemOnTT");
+      taskMemoryManagerEnabled = false;
+    }
+
+    if (!taskMemoryManagerEnabled) {
+      LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
       return;
     }
+    // ///// End of mis-configuration
 
     taskMemoryManagerEnabled = true;
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java Tue Dec
 2 20:32:18 2008
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
 import java.util.Collection;
 
 /**
@@ -70,5 +71,21 @@
    * @return the heartbeat interval used by {@link TaskTracker}s
    */
   public int getNextHeartbeatInterval();
-  
+
+  /**
+   * Kill the job identified by jobid
+   * 
+   * @param jobid
+   * @throws IOException
+   */
+  public void killJob(JobID jobid)
+      throws IOException;
+
+  /**
+   * Obtain the job object identified by jobid
+   * 
+   * @param jobid
+   * @return jobInProgress object
+   */
+  public JobInProgress getJob(JobID jobid);
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Dec 
2 20:32:18 2008
@@ -54,59 +54,99 @@
    */
   static class ResourceStatus implements Writable {
     
-    private long freeVirtualMemory;
-    private long totalMemory;
+    private long totalVirtualMemory;
+    private long reservedVirtualMemory;
+    private long totalPhysicalMemory;
+    private long reservedPhysicalMemory;
     private long availableSpace;
     
     ResourceStatus() {
-      freeVirtualMemory = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
-      totalMemory = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
+      totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
+      reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
+      totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
+      reservedPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
       availableSpace = Long.MAX_VALUE;
     }
-    
+
     /**
-     * Set the amount of free virtual memory that is available for running
-     * a new task
-     * @param freeVMem amount of free virtual memory in kilobytes
+     * Set the maximum amount of virtual memory on the tasktracker.
+     * 
+     * @param vmem maximum amount of virtual memory on the tasktracker in bytes.
      */
-    void setFreeVirtualMemory(long freeVmem) {
-      freeVirtualMemory = freeVmem;
+    void setTotalVirtualMemory(long totalMem) {
+      totalVirtualMemory = totalMem;
     }
 
     /**
-     * Get the amount of free virtual memory that will be available for
-     * running a new task. 
+     * Get the maximum amount of virtual memory on the tasktracker.
      * 
-     * If this is {@link JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT}, it should 
-     * be ignored and not used in computation.
+     * If this is {@link JobConf#DISABLED_MEMORY_LIMIT}, it should be ignored
+     * and not used in any computation.
      * 
-     *@return amount of free virtual memory in kilobytes.
+     * @return the maximum amount of virtual memory on the tasktracker in bytes.
      */
-    long getFreeVirtualMemory() {
-      return freeVirtualMemory;
+    long getTotalVirtualMemory() {
+      return totalVirtualMemory;
     }
 
     /**
-     * Set the maximum amount of virtual memory on the tasktracker.
-     * @param vmem maximum amount of virtual memory on the tasktracker in kilobytes.
+     * Set the amount of virtual memory reserved on the TaskTracker for system
+     * usage (OS, TT etc).
+     * 
+     * @param reservedVmem amount of virtual memory reserved in bytes.
      */
-    void setTotalMemory(long totalMem) {
-      totalMemory = totalMem;
+    void setReservedVirtualMemory(long reservedVmem) {
+      reservedVirtualMemory = reservedVmem;
     }
-    
+
     /**
-     * Get the maximum amount of virtual memory on the tasktracker.
+     * Get the amount of virtual memory reserved on the TaskTracker for system
+     * usage (OS, TT etc).
+     */
+    long getReservedTotalMemory() {
+      return reservedVirtualMemory;
+    }
+
+    /**
+     * Set the maximum amount of physical memory on the tasktracker.
      * 
-     * If this is
-     * {@link JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT}, it should be ignored 
+     * @param totalRAM maximum amount of physical memory on the tasktracker in
+     *          bytes.
+     */
+    void setTotalPhysicalMemory(long totalRAM) {
+      totalPhysicalMemory = totalRAM;
+    }
+
+    /**
+     * Get the maximum amount of physical memory on the tasktracker.
+     * 
+     * If this is {@link JobConf#DISABLED_MEMORY_LIMIT}, it should be ignored
      * and not used in any computation.
      * 
-     * @return maximum amount of virtual memory on the tasktracker in kilobytes. 
-     */    
-    long getTotalMemory() {
-      return totalMemory;
+     * @return maximum amount of physical memory on the tasktracker in bytes.
+     */
+    long getTotalPhysicalMemory() {
+      return totalPhysicalMemory;
     }
-    
+
+    /**
+     * Set the amount of physical memory reserved on the TaskTracker for system
+     * usage (OS, TT etc).
+     * 
+     * @param reservedPmem amount of physical memory reserved in bytes.
+     */
+    void setReservedPhysicalMemory(long reservedPmem) {
+      reservedPhysicalMemory = reservedPmem;
+    }
+
+    /**
+     * Get the amount of physical memory reserved on the TaskTracker for system
+     * usage (OS, TT etc).
+     */
+    long getReservedPhysicalMemory() {
+      return reservedPhysicalMemory;
+    }
+
     void setAvailableSpace(long availSpace) {
       availableSpace = availSpace;
     }
@@ -120,15 +160,19 @@
     }
     
     public void write(DataOutput out) throws IOException {
-      WritableUtils.writeVLong(out, freeVirtualMemory);
-      WritableUtils.writeVLong(out, totalMemory);
+      WritableUtils.writeVLong(out, totalVirtualMemory);
+      WritableUtils.writeVLong(out, reservedVirtualMemory);
+      WritableUtils.writeVLong(out, totalPhysicalMemory);
+      WritableUtils.writeVLong(out, reservedPhysicalMemory);
       WritableUtils.writeVLong(out, availableSpace);
     }
     
     public void readFields(DataInput in) throws IOException {
-      freeVirtualMemory = WritableUtils.readVLong(in);;
-      totalMemory = WritableUtils.readVLong(in);;
-      availableSpace = WritableUtils.readVLong(in);;
+      totalVirtualMemory = WritableUtils.readVLong(in);
+      reservedVirtualMemory = WritableUtils.readVLong(in);
+      totalPhysicalMemory = WritableUtils.readVLong(in);
+      reservedPhysicalMemory = WritableUtils.readVLong(in);
+      availableSpace = WritableUtils.readVLong(in);
     }
   }
   

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/DummyMemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/DummyMemoryCalculatorPlugin.java?rev=722760&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/DummyMemoryCalculatorPlugin.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/DummyMemoryCalculatorPlugin.java Tue
Dec  2 20:32:18 2008
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.util.MemoryCalculatorPlugin;
+
+/**
+ * Plugin class to test virtual and physical memories reported by TT. Use
+ * configuration items {@link #MAXVMEM_TESTING_PROPERTY} and
+ * {@link #MAXPMEM_TESTING_PROPERTY} to tell TT the total vmem and the total
+ * pmem.
+ */
+public class DummyMemoryCalculatorPlugin extends MemoryCalculatorPlugin {
+
+  /** max vmem on the TT */
+  public static final String MAXVMEM_TESTING_PROPERTY =
+      "mapred.tasktracker.maxvmem.testing";
+  /** max pmem on the TT */
+  public static final String MAXPMEM_TESTING_PROPERTY =
+      "mapred.tasktracker.maxpmem.testing";
+
+  /** {@inheritDoc} */
+  @Override
+  public long getVirtualMemorySize() {
+    return getConf().getLong(MAXVMEM_TESTING_PROPERTY, -1);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getPhysicalMemorySize() {
+    return getConf().getLong(MAXPMEM_TESTING_PROPERTY, -1);
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue
Dec  2 20:32:18 2008
@@ -152,10 +152,20 @@
     public int getNextHeartbeatInterval() {
       return MRConstants.HEARTBEAT_INTERVAL_MIN;
     }
-    
+
+    @Override
+    public void killJob(JobID jobid) {
+      return;
+    }
+
+    @Override
+    public JobInProgress getJob(JobID jobid) {
+      return null;
+    }
+
     // Test methods
     
-    public void submitJob(JobInProgress job) {
+    public void submitJob(JobInProgress job) throws IOException {
       for (JobInProgressListener listener : listeners) {
         listener.jobAdded(job);
       }

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=722760&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java Tue Dec
 2 20:32:18 2008
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
+import org.apache.hadoop.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.util.ToolRunner;
+
+import junit.framework.TestCase;
+
+/**
+ * This test class tests the functionality related to configuring, reporting
+ * and computing memory related parameters in a Map/Reduce cluster.
+ * 
+ * Each test sets up a {@link MiniMRCluster} with a locally defined 
+ * {@link org.apache.hadoop.mapred.TaskScheduler}. This scheduler validates 
+ * the memory related configuration is correctly computed and reported from 
+ * the tasktracker in 
+ * {@link org.apache.hadoop.mapred.TaskScheduler#assignTasks(TaskTrackerStatus)}.
+ */
+public class TestTTMemoryReporting extends TestCase {
+
+  static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class);
+  
+  private MiniDFSCluster miniDFSCluster;
+  private MiniMRCluster miniMRCluster;
+
+  /**
+   * Fake scheduler to test the proper reporting of memory values by TT
+   */
+  public static class FakeTaskScheduler extends JobQueueTaskScheduler {
+    
+    private boolean hasPassed = true;
+    private String message;
+    
+    public FakeTaskScheduler() {
+      super();
+    }
+    
+    public boolean hasTestPassed() {
+      return hasPassed;
+    }
+    
+    public String getFailureMessage() {
+      return message;
+    }
+    
+    @Override
+    public List<Task> assignTasks(TaskTrackerStatus status)
+        throws IOException {
+
+      long totalVirtualMemoryOnTT =
+          getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+      long totalPhysicalMemoryOnTT =
+          getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+      long virtualMemoryReservedOnTT =
+          getConf().getLong("reservedVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+      long physicalMemoryReservedOnTT =
+          getConf().getLong("reservedPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+
+      long reportedTotalVirtualMemoryOnTT =
+          status.getResourceStatus().getTotalVirtualMemory();
+      long reportedTotalPhysicalMemoryOnTT =
+          status.getResourceStatus().getTotalPhysicalMemory();
+      long reportedVirtualMemoryReservedOnTT =
+          status.getResourceStatus().getReservedTotalMemory();
+      long reportedPhysicalMemoryReservedOnTT =
+          status.getResourceStatus().getReservedPhysicalMemory();
+
+      message =
+          "expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
+              + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
+              + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ", "
+              + virtualMemoryReservedOnTT + ", " + physicalMemoryReservedOnTT
+              + ")";
+      message +=
+          "\nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
+              + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
+              + reportedTotalVirtualMemoryOnTT
+              + ", "
+              + reportedTotalPhysicalMemoryOnTT
+              + ", "
+              + reportedVirtualMemoryReservedOnTT
+              + ", "
+              + reportedPhysicalMemoryReservedOnTT + ")";
+      LOG.info(message);
+      if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
+          || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
+          || virtualMemoryReservedOnTT != reportedVirtualMemoryReservedOnTT
+          || physicalMemoryReservedOnTT != reportedPhysicalMemoryReservedOnTT) {
+        hasPassed = false;
+      }
+      return super.assignTasks(status);
+    }
+  }
+
+  /**
+   * Test that verifies default values are configured and reported correctly.
+   * 
+   * @throws Exception
+   */
+  public void testDefaultMemoryValues()
+      throws Exception {
+    JobConf conf = new JobConf();
+    try {
+      // Memory values are disabled by default.
+      conf.setClass(
+          TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+          DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+      setUpCluster(conf);
+      runSleepJob();
+      verifyTestResults();
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  /**
+   * Test that verifies that configured values are reported correctly.
+   * 
+   * @throws Exception
+   */
+  public void testConfiguredMemoryValues()
+      throws Exception {
+    JobConf conf = new JobConf();
+    conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L);
+    conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L);
+    conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
+    conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
+    conf.setClass(
+        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+    conf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
+        4 * 1024 * 1024 * 1024L);
+    conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
+        2 * 1024 * 1024 * 1024L);
+    conf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
+        1 * 1024 * 1024 * 1024L);
+    conf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
+        512 * 1024 * 1024L);
+    try {
+      setUpCluster(conf);
+      runSleepJob();
+      verifyTestResults();
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  /**
+   * Test that verifies that total memory values are calculated and reported
+   * correctly.
+   * 
+   * @throws Exception
+   */
+  public void testMemoryValuesOnLinux()
+      throws Exception {
+    if (!System.getProperty("os.name").startsWith("Linux")) {
+      return;
+    }
+
+    JobConf conf = new JobConf();
+    LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
+    conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize());
+    conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize());
+    conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
+    conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
+    conf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
+        1 * 1024 * 1024 * 1024L);
+    conf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
+        512 * 1024 * 1024L);
+    try {
+      setUpCluster(conf);
+      runSleepJob();
+      verifyTestResults();
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  private void setUpCluster(JobConf conf)
+                                throws Exception {
+    conf.setClass("mapred.jobtracker.taskScheduler", 
+        TestTTMemoryReporting.FakeTaskScheduler.class,
+        TaskScheduler.class);
+    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fileSys = miniDFSCluster.getFileSystem();
+    String namenode = fileSys.getUri().toString();
+    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
+                      null, null, conf);    
+  }
+  
+  private void runSleepJob() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("mapred.job.tracker", "localhost:"
+                              + miniMRCluster.getJobTrackerPort());
+    String[] args = { "-m", "1", "-r", "1",
+                      "-mt", "1000", "-rt", "1000" };
+    ToolRunner.run(conf, new SleepJob(), args);
+  }
+
+  private void verifyTestResults() {
+    FakeTaskScheduler scheduler = 
+      (FakeTaskScheduler)miniMRCluster.getJobTrackerRunner().
+                              getJobTracker().getTaskScheduler();
+    assertTrue(scheduler.getFailureMessage(), scheduler.hasTestPassed());
+  }
+  
+  private void tearDownCluster() {
+    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
+    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
Tue Dec  2 20:32:18 2008
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
@@ -46,7 +47,7 @@
 
   private String taskOverLimitPatternString =
       "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. "
-          + "Current usage : [0-9]*kB. Limit : %skB. Killing task.";
+          + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
 
   private void startCluster(JobConf conf) throws Exception {
     miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
@@ -168,14 +169,21 @@
       return;
     }
 
+    // Fairly large value for sleepJob to succeed
+    long ttLimit = 4 * 1024 * 1024 * 1024L;
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
 
-    // Fairly large value for sleepJob to succeed
-    fConf.setLong("mapred.tasktracker.tasks.maxmemory", 10000000000L);
+    fConf.setClass(
+        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
+        ttLimit);
+    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, ttLimit);
+    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, ttLimit);
+    fConf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
     startCluster(fConf);
-
-    // Set up job.
     JobConf conf = new JobConf();
     runAndCheckSuccessfulJob(conf);
   }
@@ -193,19 +201,29 @@
       return;
     }
 
-    long PER_TASK_LIMIT = 10000000000L; // Large so sleepjob goes through.
-    long TASK_TRACKER_LIMIT = 10000000000L; // Large so as to fit total usage
+    // Large so that sleepjob goes through and fits total TT usage
+    long PER_TASK_LIMIT = 2 * 1024 * 1024 * 1024L;
+    long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
 
-    // Fairly large value for sleepjob to succeed
-    fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+    fConf.setClass(
+        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
     startCluster(fConf);
-
     JobConf conf = new JobConf();
     conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
     runAndCheckSuccessfulJob(conf);
+
   }
 
   /**
@@ -223,7 +241,8 @@
     }
 
     long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks.
-    long TASK_TRACKER_LIMIT = 10000000000L; // Large so as to fit total usage
+    long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L; // Large so as to fit
+    // total usage
     Pattern taskOverLimitPattern =
         Pattern.compile(String.format(taskOverLimitPatternString, String
             .valueOf(PER_TASK_LIMIT)));
@@ -231,7 +250,17 @@
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+    fConf.setClass(
+        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
 
     // very small value, so that no task escapes to successful completion.
     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
@@ -313,14 +342,25 @@
         Pattern.compile(String.format(taskOverLimitPatternString, String
             .valueOf(PER_TASK_LIMIT)));
     Pattern trackerOverLimitPattern =
-        Pattern.compile("Killing one of the least progress tasks - .*, as "
-            + "the cumulative memory usage of all the tasks on the TaskTracker"
-            + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
+        Pattern
+            .compile("Killing one of the least progress tasks - .*, as "
+                + "the cumulative memory usage of all the tasks on the TaskTracker"
+                + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
     Matcher mat = null;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+    fConf.setClass(
+        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
     // very small value, so that no task escapes to successful completion.
     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
         String.valueOf(300));
@@ -374,4 +414,4 @@
     // Test succeeded, kill the job.
     job.killJob();
   }
-}
\ No newline at end of file
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=722760&r1=722759&r2=722760&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Tue
Dec  2 20:32:18 2008
@@ -39,7 +39,7 @@
   private String shellScript;
   private static final int N = 10; // Controls the RogueTask
 
-  private static final int memoryLimit = 15000; // kilobytes
+  private static final int memoryLimit = 15 * 1024 * 1024; // 15MB
   private static final long PROCESSTREE_RECONSTRUCTION_INTERVAL =
     ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL; // msec
 
@@ -125,7 +125,7 @@
       while (true) {
         LOG.info("ProcessTree: " + p.toString());
         long mem = p.getCumulativeVmem();
-        LOG.info("Memory usage: " + mem + "kB.");
+        LOG.info("Memory usage: " + mem + "bytes.");
         if (mem > memoryLimit) {
           p.destroy();
           break;



Mime
View raw message