hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r936166 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/server/tasktracker/ src/test/mapred/org/apache/hadoop/mapred/
Date Wed, 21 Apr 2010 06:14:53 GMT
Author: acmurthy
Date: Wed Apr 21 06:14:53 2010
New Revision: 936166

URL: http://svn.apache.org/viewvc?rev=936166&view=rev
Log:
MAPREDUCE-1221. Allow admins to control physical memory limits per-task and per-node. Contributed
by Scott Chen.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=936166&r1=936165&r2=936166&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Apr 21 06:14:53 2010
@@ -258,6 +258,9 @@ Trunk (unreleased changes)
     add random numbers to their names to avoid conflicts
     (Rodrigo Schmidt via dhruba)
 
+    MAPREDUCE-1221. Allow admins to control physical memory limits per-task
+    and per-node. (Scott Chen via acmurthy) 
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=936166&r1=936165&r2=936166&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
Wed Apr 21 06:14:53 2010
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -27,6 +29,7 @@ import java.util.ArrayList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@@ -46,20 +49,30 @@ class TaskMemoryManagerThread extends Th
   private long monitoringInterval;
 
   private long maxMemoryAllowedForAllTasks;
+  private long maxRssMemoryAllowedForAllTasks;
 
   private Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
   private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
   private List<TaskAttemptID> tasksToBeRemoved;
 
   private static final String MEMORY_USAGE_STRING =
-    "Memory usage of ProcessTree %s for task-id %s : %d bytes, " +
-      "limit : %d bytes";
+    "Memory usage of ProcessTree %s for task-id %s : Virutal %d bytes, " +
+      "limit : %d bytes; Physical %d bytes, limit %d bytes";
   
   public TaskMemoryManagerThread(TaskTracker taskTracker) {
     this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
       taskTracker.getJobConf().getLong(
         TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL, 5000L));         
     this.taskTracker = taskTracker;
+    long reservedRssMemory = taskTracker.getReservedPhysicalMemoryOnTT();
+    long totalPhysicalMemoryOnTT = taskTracker.getTotalPhysicalMemoryOnTT();
+    if (reservedRssMemory == JobConf.DISABLED_MEMORY_LIMIT ||
+        totalPhysicalMemoryOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
+      maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT;
+    } else {
+      maxRssMemoryAllowedForAllTasks =
+                totalPhysicalMemoryOnTT - reservedRssMemory;
+    }
   }
 
   // mainly for test purposes. note that the tasktracker variable is
@@ -72,15 +85,17 @@ class TaskMemoryManagerThread extends Th
     tasksToBeAdded = new HashMap<TaskAttemptID, ProcessTreeInfo>();
     tasksToBeRemoved = new ArrayList<TaskAttemptID>();
 
-    this.maxMemoryAllowedForAllTasks = maxMemoryAllowedForAllTasks;
+    this.maxMemoryAllowedForAllTasks = maxMemoryAllowedForAllTasks < 0 ?
+        JobConf.DISABLED_MEMORY_LIMIT : maxMemoryAllowedForAllTasks;
 
     this.monitoringInterval = monitoringInterval;
   }
 
-  public void addTask(TaskAttemptID tid, long memLimit) {
+  public void addTask(TaskAttemptID tid, long memLimit, long memLimitPhysical) {
     synchronized (tasksToBeAdded) {
       LOG.debug("Tracking ProcessTree " + tid + " for the first time");
-      ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit);
+      ProcessTreeInfo ptInfo =
+        new ProcessTreeInfo(tid, null, null, memLimit, memLimitPhysical);
       tasksToBeAdded.put(tid, ptInfo);
     }
   }
@@ -96,13 +111,15 @@ class TaskMemoryManagerThread extends Th
     private String pid;
     private ProcfsBasedProcessTree pTree;
     private long memLimit;
+    private long memLimitPhysical;
 
     public ProcessTreeInfo(TaskAttemptID tid, String pid,
-        ProcfsBasedProcessTree pTree, long memLimit) {
+        ProcfsBasedProcessTree pTree, long memLimit, long memLimitPhysical) {
       this.tid = tid;
       this.pid = pid;
       this.pTree = pTree;
       this.memLimit = memLimit;
+      this.memLimitPhysical = memLimitPhysical;
     }
 
     public TaskAttemptID getTID() {
@@ -128,6 +145,13 @@ class TaskMemoryManagerThread extends Th
     public long getMemLimit() {
       return memLimit;
     }
+
+    /**
+     * @return Physical memory limit for the process tree in bytes
+     */
+    public long getMemLimitPhysical() {
+      return memLimitPhysical;
+    }
   }
 
   @Override
@@ -162,6 +186,7 @@ class TaskMemoryManagerThread extends Th
       }
 
       long memoryStillInUsage = 0;
+      long rssMemoryStillInUsage = 0;
       // Now, check memory usage and kill any overflowing tasks
       for (Iterator<Map.Entry<TaskAttemptID, ProcessTreeInfo>> it = processTreeInfoMap
           .entrySet().iterator(); it.hasNext();) {
@@ -202,6 +227,10 @@ class TaskMemoryManagerThread extends Th
             continue; // processTree cannot be tracked
           }
 
+          if (taskTracker.runningTasks.get(tid).wasKilled()) {
+            continue; // this task has been killed already
+          }
+
           LOG.debug("Constructing ProcessTree for : PID = " + pId + " TID = "
               + tid);
           ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
@@ -209,26 +238,49 @@ class TaskMemoryManagerThread extends Th
           ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
                                         // updated state
           long currentMemUsage = pTree.getCumulativeVmem();
+          long currentRssMemUsage = pTree.getCumulativeRssmem();
           // as processes begin with an age 1, we want to see if there 
           // are processes more than 1 iteration old.
           long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
+          long curRssMemUsageOfAgedProcesses = pTree.getCumulativeRssmem(1);
           long limit = ptInfo.getMemLimit();
+          long limitPhysical = ptInfo.getMemLimitPhysical();
           LOG.info(String.format(MEMORY_USAGE_STRING, 
-                                pId, tid.toString(), currentMemUsage, limit));
+                                pId, tid.toString(), currentMemUsage, limit,
+                                currentRssMemUsage, limitPhysical));
 
-          if (isProcessTreeOverLimit(tid.toString(), currentMemUsage, 
+          boolean isMemoryOverLimit = false;
+          String msg = "";
+          if (doCheckVirtualMemory() &&
+              isProcessTreeOverLimit(tid.toString(), currentMemUsage,
                                       curMemUsageOfAgedProcesses, limit)) {
             // Task (the root process) is still alive and overflowing memory.
             // Dump the process-tree and then clean it up.
-            String msg =
-                "TaskTree [pid=" + pId + ",tipID=" + tid
+            msg = "TaskTree [pid=" + pId + ",tipID=" + tid
                     + "] is running beyond memory-limits. Current usage : "
                     + currentMemUsage + "bytes. Limit : " + limit
                     + "bytes. Killing task. \nDump of the process-tree for "
                     + tid + " : \n" + pTree.getProcessTreeDump();
+            isMemoryOverLimit = true;
+          } else if (doCheckPhysicalMemory() &&
+              isProcessTreeOverLimit(tid.toString(), currentRssMemUsage,
+                                curRssMemUsageOfAgedProcesses, limitPhysical)) {
+            // Task (the root process) is still alive and overflowing memory.
+            // Dump the process-tree and then clean it up.
+            msg = "TaskTree [pid=" + pId + ",tipID=" + tid
+                    + "] is running beyond physical memory-limits."
+                    + " Current usage : "
+                    + currentRssMemUsage + "bytes. Limit : " + limitPhysical
+                    + "bytes. Killing task. \nDump of the process-tree for "
+                    + tid + " : \n" + pTree.getProcessTreeDump();
+            isMemoryOverLimit = true;
+          }
+
+          if (isMemoryOverLimit) {
+            // Virtual or physical memory over limit. Fail the task and remove
+            // the corresponding process tree
             LOG.warn(msg);
             taskTracker.cleanUpOverMemoryTask(tid, true, msg);
-
             // Now destroy the ProcessTree, remove it from monitoring map.
             pTree.destroy(true/*in the background*/);
             it.remove();
@@ -237,6 +289,7 @@ class TaskMemoryManagerThread extends Th
             // Accounting the total memory in usage for all tasks that are still
             // alive and within limits.
             memoryStillInUsage += currentMemUsage;
+            rssMemoryStillInUsage += currentRssMemUsage;
           }
         } catch (Exception e) {
           // Log the exception and proceed to the next task.
@@ -246,13 +299,23 @@ class TaskMemoryManagerThread extends Th
         }
       }
 
-      if (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
+      if (doCheckVirtualMemory() &&
+          memoryStillInUsage > maxMemoryAllowedForAllTasks) {
         LOG.warn("The total memory in usage " + memoryStillInUsage
             + " is still overflowing TTs limits "
             + maxMemoryAllowedForAllTasks
             + ". Trying to kill a few tasks with the least progress.");
         killTasksWithLeastProgress(memoryStillInUsage);
       }
+      
+      if (doCheckPhysicalMemory() &&
+          rssMemoryStillInUsage > maxRssMemoryAllowedForAllTasks) {
+        LOG.warn("The total physical memory in usage " + rssMemoryStillInUsage
+            + " is still overflowing TTs limits "
+            + maxRssMemoryAllowedForAllTasks
+            + ". Trying to kill a few tasks with the highest memory.");
+        killTasksWithMaxRssMemory(rssMemoryStillInUsage);
+      }
     
       // Sleep for some time before beginning next cycle
       try {
@@ -268,6 +331,22 @@ class TaskMemoryManagerThread extends Th
   }
 
   /**
+   * Is the total physical memory check enabled?
+   * @return true if total physical memory check is enabled.
+   */
+  private boolean doCheckPhysicalMemory() {
+    return !(maxRssMemoryAllowedForAllTasks == JobConf.DISABLED_MEMORY_LIMIT);
+  }
+
+  /**
+   * Is the total virtual memory check enabled?
+   * @return true if total virtual memory check is enabled.
+   */
+  private boolean doCheckVirtualMemory() {
+    return !(maxMemoryAllowedForAllTasks == JobConf.DISABLED_MEMORY_LIMIT);
+  }
+
+  /**
    * Check whether a task's process tree's current memory usage is over limit.
    * 
    * When a java process exec's a program, it could momentarily account for
@@ -332,8 +411,9 @@ class TaskMemoryManagerThread extends Th
     List<TaskAttemptID> tasksToExclude = new ArrayList<TaskAttemptID>();
     // Find tasks to kill so as to get memory usage under limits.
     while (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
-      // Exclude tasks that are already marked for
-      // killing.
+      // Exclude tasks that are already marked for killing.
+      // Note that we do not need to call isKillable() here because the logic
+      // is contained in taskTracker.findTaskToKill()
       TaskInProgress task = taskTracker.findTaskToKill(tasksToExclude);
       if (task == null) {
         break; // couldn't find any more tasks to kill.
@@ -360,14 +440,7 @@ class TaskMemoryManagerThread extends Th
                 + "the TaskTracker exceeds virtual memory limit "
                 + maxMemoryAllowedForAllTasks + ".";
         LOG.warn(msg);
-        // Kill the task and mark it as killed.
-        taskTracker.cleanUpOverMemoryTask(tid, false, msg);
-        // Now destroy the ProcessTree, remove it from monitoring map.
-        ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
-        ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
-        pTree.destroy(true/*in the background*/);
-        processTreeInfoMap.remove(tid);
-        LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
+        killTask(tid, msg);
       }
     } else {
       LOG.info("The total memory usage is overflowing TTs limits. "
@@ -375,4 +448,91 @@ class TaskMemoryManagerThread extends Th
     }
   }
 
+  /**
+   * Return the cumulative rss memory used by a task
+   * @param tid the task attempt ID of the task
+   * @return rss memory usage in bytes. 0 if the process tree is not available
+   */
+  private long getTaskCumulativeRssmem(TaskAttemptID tid) {
+      ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
+      ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
+      return pTree == null ? 0 : pTree.getCumulativeVmem();
+  }
+
+  /**
+   * Starting from the tasks use the highest amount of RSS memory,
+   * kill the tasks until the RSS memory meets the requirement
+   * @param rssMemoryInUsage
+   */
+  private void killTasksWithMaxRssMemory(long rssMemoryInUsage) {
+    
+    List<TaskAttemptID> tasksToKill = new ArrayList<TaskAttemptID>();
+    List<TaskAttemptID> allTasks = new ArrayList<TaskAttemptID>();
+    allTasks.addAll(processTreeInfoMap.keySet());
+    // Sort the tasks ascendingly according to RSS memory usage 
+    Collections.sort(allTasks, new Comparator<TaskAttemptID>() {
+      public int compare(TaskAttemptID tid1, TaskAttemptID tid2) {
+        return  getTaskCumulativeRssmem(tid1) < getTaskCumulativeRssmem(tid2) ?
+                -1 : 1;
+      }});
+    
+    // Kill the tasks one by one until the memory requirement is met
+    while (rssMemoryInUsage > maxRssMemoryAllowedForAllTasks &&
+           !allTasks.isEmpty()) {
+      TaskAttemptID tid = allTasks.remove(allTasks.size() - 1);
+      if (!isKillable(tid)) {
+        continue;
+      }
+      long rssmem = getTaskCumulativeRssmem(tid);
+      if (rssmem == 0) {
+        break; // Skip tasks without process tree information currently
+      }
+      tasksToKill.add(tid);
+      rssMemoryInUsage -= rssmem;
+    }
+
+    // Now kill the tasks.
+    if (!tasksToKill.isEmpty()) {
+      for (TaskAttemptID tid : tasksToKill) {
+        String msg =
+            "Killing one of the memory-consuming tasks - " + tid
+                + ", as the cumulative RSS memory usage of all the tasks on "
+                + "the TaskTracker exceeds physical memory limit "
+                + maxRssMemoryAllowedForAllTasks + ".";
+        LOG.warn(msg);
+        killTask(tid, msg);
+      }
+    } else {
+      LOG.info("The total physical memory usage is overflowing TTs limits. "
+          + "But found no alive task to kill for freeing memory.");
+    }
+  }
+
+  /**
+   * Kill the task and clean up ProcessTreeInfo
+   * @param tid task attempt ID of the task to be killed.
+   * @param msg diagnostics message
+   */
+  private void killTask(TaskAttemptID tid, String msg) {
+    // Kill the task and mark it as killed.
+    taskTracker.cleanUpOverMemoryTask(tid, false, msg);
+    // Now destroy the ProcessTree, remove it from monitoring map.
+    ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
+    ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
+    pTree.destroy(true/*in the background*/);
+    processTreeInfoMap.remove(tid);
+    LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
+  }
+
+  /**
+   * Check if a task can be killed to increase free memory
+   * @param tid task attempt ID
+   * @return true if the task can be killed
+   */
+  private boolean isKillable(TaskAttemptID tid) {
+      TaskInProgress tip = taskTracker.runningTasks.get(tid);
+      return tip != null && !tip.wasKilled() &&
+             (tip.getRunState() == TaskStatus.State.RUNNING ||
+              tip.getRunState() == TaskStatus.State.COMMIT_PENDING);
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=936166&r1=936165&r2=936166&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Apr 21 06:14:53
2010
@@ -267,6 +267,7 @@ public class TaskTracker 
   private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+  private long reservedPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private ResourceCalculatorPlugin resourceCalculatorPlugin = null;
 
   // Manages job acls of jobs in TaskTracker
@@ -1722,6 +1723,15 @@ public class TaskTracker 
   }
 
   /**
+   * @return The amount of physical memory that will not be used for running
+   *         tasks in bytes. Returns JobConf.DISABLED_MEMORY_LIMIT if it is not
+   *         configured.
+   */
+  long getReservedPhysicalMemoryOnTT() {
+    return reservedPhysicalMemoryOnTT;
+  }
+
+  /**
    * Check if the jobtracker directed a 'reset' of the tasktracker.
    * 
    * @param actions the directives of the jobtracker for the tasktracker.
@@ -2231,11 +2241,25 @@ public class TaskTracker 
   
   void addToMemoryManager(TaskAttemptID attemptId, boolean isMap, 
                           JobConf conf) {
-    if (isTaskMemoryManagerEnabled()) {
-      taskMemoryManager.addTask(attemptId, isMap ? conf
-          .getMemoryForMapTask() * 1024 * 1024L : conf
-          .getMemoryForReduceTask() * 1024 * 1024L);
+    if (!isTaskMemoryManagerEnabled()) {
+      return; // Skip this if TaskMemoryManager is not enabled.
     }
+    // Obtain physical memory limits from the job configuration
+    long physicalMemoryLimit =
+      conf.getLong(isMap ? JobContext.MAP_MEMORY_PHYSICAL_MB :
+                   JobContext.REDUCE_MEMORY_PHYSICAL_MB,
+                   JobConf.DISABLED_MEMORY_LIMIT);
+    if (physicalMemoryLimit > 0) {
+      physicalMemoryLimit *= 1024L * 1024L;
+    }
+
+    // Obtain virtual memory limits from the job configuration
+    long virtualMemoryLimit = isMap ?
+      conf.getMemoryForMapTask() * 1024 * 1024 :
+      conf.getMemoryForReduceTask() * 1024 * 1024;
+
+    taskMemoryManager.addTask(attemptId, virtualMemoryLimit,
+                              physicalMemoryLimit);
   }
 
   void removeFromMemoryManager(TaskAttemptID attemptId) {
@@ -3821,6 +3845,14 @@ public class TaskTracker 
           + " Thrashing might happen.");
     }
 
+    reservedPhysicalMemoryOnTT =
+      fConf.getLong(TTConfig.TT_RESERVED_PHYSCIALMEMORY_MB,
+                    JobConf.DISABLED_MEMORY_LIMIT);
+    reservedPhysicalMemoryOnTT =
+      reservedPhysicalMemoryOnTT == JobConf.DISABLED_MEMORY_LIMIT ?
+      JobConf.DISABLED_MEMORY_LIMIT :
+      reservedPhysicalMemoryOnTT * 1024 * 1024; // normalize to bytes
+
     // start the taskMemoryManager thread only if enabled
     setTaskMemoryManagerEnabledFlag();
     if (isTaskMemoryManagerEnabled()) {
@@ -3838,10 +3870,12 @@ public class TaskTracker 
       return;
     }
 
-    if (totalMemoryAllottedForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
+    if (reservedPhysicalMemoryOnTT == JobConf.DISABLED_MEMORY_LIMIT
+        && totalMemoryAllottedForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
       taskMemoryManagerEnabled = false;
-      LOG.warn("TaskTracker's totalMemoryAllottedForTasks is -1."
-          + " TaskMemoryManager is disabled.");
+      LOG.warn("TaskTracker's totalMemoryAllottedForTasks is -1 and " +
+               "reserved physical memory is not configured. " +
+               "TaskMemoryManager is disabled.");
       return;
     }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=936166&r1=936165&r2=936166&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Wed Apr 21
06:14:53 2010
@@ -156,6 +156,8 @@ public interface JobContext {
   public static final String MAP_INPUT_PATH = "mapreduce.map.input.length";
   public static final String MAP_INPUT_START = "mapreduce.map.input.start";
   public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
+  public static final String MAP_MEMORY_PHYSICAL_MB =
+    "mapreduce.map.memory.physical.mb";
   public static final String MAP_ENV = "mapreduce.map.env";
   public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
   public static final String MAP_ULIMIT = "mapreduce.map.ulimit"; 
@@ -193,6 +195,8 @@ public interface JobContext {
     "mapreduce.reduce.markreset.buffer.percent";
   public static final String REDUCE_MARKRESET_BUFFER_SIZE = 
     "mapreduce.reduce.markreset.buffer.size";
+  public static final String REDUCE_MEMORY_PHYSICAL_MB = 
+    "mapreduce.reduce.memory.physical.mb";
   public static final String REDUCE_MEMORY_MB = 
     "mapreduce.reduce.memory.mb";
   public static final String REDUCE_MEMORY_TOTAL_BYTES = 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=936166&r1=936165&r2=936166&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
Wed Apr 21 06:14:53 2010
@@ -85,6 +85,8 @@ public interface TTConfig extends MRConf
     "mapreduce.tasktracker.cache.local.numberdirectories";
   public static final String TT_OUTOFBAND_HEARBEAT =
     "mapreduce.tasktracker.outofband.heartbeat";
+  public static final String TT_RESERVED_PHYSCIALMEMORY_MB =
+    "mapreduce.tasktracker.reserved.physicalmemory.mb";
   public static final String TT_USER_NAME = "mapreduce.tasktracker.user.name";
   public static final String TT_KEYTAB_FILE = 
     "mapreduce.tasktracker.keytab.file";

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=936166&r1=936165&r2=936166&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
Wed Apr 21 06:14:53 2010
@@ -34,9 +34,11 @@ import org.apache.hadoop.mapreduce.MRCon
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.LinuxResourceCalculatorPlugin;
 import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.mapreduce.util.TestProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -55,7 +57,7 @@ public class TestTaskTrackerMemoryManage
   private MiniMRCluster miniMRCluster;
 
   private String taskOverLimitPatternString =
-      "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. "
+      "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond.*memory-limits. "
           + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
 
   private void startCluster(JobConf conf)
@@ -175,11 +177,16 @@ public class TestTaskTrackerMemoryManage
     JobConf fConf = new JobConf();
     fConf.setLong(MRConfig.MAPMEMORY_MB, 2 * 1024L);
     fConf.setLong(MRConfig.REDUCEMEMORY_MB, 2 * 1024L);
+    // Reserve only 1 mb of the memory on TaskTrackers
+    fConf.setLong(TTConfig.TT_RESERVED_PHYSCIALMEMORY_MB, 1L);
     startCluster(new JobConf());
 
     JobConf conf = new JobConf(miniMRCluster.createJobConf());
     conf.setMemoryForMapTask(PER_TASK_LIMIT);
     conf.setMemoryForReduceTask(PER_TASK_LIMIT);
+    // Set task physical memory limits
+    conf.setLong(JobContext.MAP_MEMORY_PHYSICAL_MB, PER_TASK_LIMIT);
+    conf.setLong(JobContext.REDUCE_MEMORY_PHYSICAL_MB, PER_TASK_LIMIT);
     runAndCheckSuccessfulJob(conf);
   }
 
@@ -204,7 +211,31 @@ public class TestTaskTrackerMemoryManage
     fConf.setLong(MRConfig.MAPMEMORY_MB, 2 * 1024);
     fConf.setLong(MRConfig.REDUCEMEMORY_MB, 2 * 1024);
     startCluster(fConf);
-    runJobExceedingMemoryLimit();
+    runJobExceedingMemoryLimit(false);
+  }
+  
+  /**
+   * Test for verifying that tasks that go beyond physical limits get killed.
+   * 
+   * @throws Exception
+   */
+  public void testTasksBeyondPhysicalLimits()
+      throws Exception {
+
+    // Run the test only if memory management is enabled
+    if (!isProcfsBasedTreeAvailable()) {
+      return;
+    }
+
+    // Start cluster with proper configuration.
+    JobConf fConf = new JobConf();
+    // very small value, so that no task escapes to successful completion.
+    fConf.set(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL,
+        String.valueOf(300));
+    // Reserve only 1 mb of the memory on TaskTrackers
+    fConf.setLong(TTConfig.TT_RESERVED_PHYSCIALMEMORY_MB, 1L);
+    startCluster(fConf);
+    runJobExceedingMemoryLimit(true);
   }
   
   /**
@@ -234,15 +265,18 @@ public class TestTaskTrackerMemoryManage
     fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, 
         (3L * 1024L * 1024L * 1024L));
     startCluster(fConf);
-    runJobExceedingMemoryLimit();
+    runJobExceedingMemoryLimit(false);
   }
 
   /**
    * Runs a job which should fail the when run by the memory monitor.
    * 
+   * @param doPhysicalMemory If it is true, use physical memory limit.
+   *                         Otherwise use virtual memory limit.
    * @throws IOException
    */
-  private void runJobExceedingMemoryLimit() throws IOException {
+  private void runJobExceedingMemoryLimit(boolean doPhysicalMemory)
+    throws IOException {
     long PER_TASK_LIMIT = 1L; // Low enough to kill off sleepJob tasks.
 
     Pattern taskOverLimitPattern =
@@ -252,8 +286,13 @@ public class TestTaskTrackerMemoryManage
 
     // Set up job.
     JobConf conf = new JobConf(miniMRCluster.createJobConf());
-    conf.setMemoryForMapTask(PER_TASK_LIMIT);
-    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
+    if (doPhysicalMemory) {
+      conf.setLong(JobContext.MAP_MEMORY_PHYSICAL_MB, PER_TASK_LIMIT);
+      conf.setLong(JobContext.REDUCE_MEMORY_PHYSICAL_MB, PER_TASK_LIMIT);
+    } else {
+      conf.setMemoryForMapTask(PER_TASK_LIMIT);
+      conf.setMemoryForReduceTask(PER_TASK_LIMIT);
+    }
     conf.setMaxMapAttempts(1);
     conf.setMaxReduceAttempts(1);
 
@@ -472,4 +511,87 @@ public class TestTaskTrackerMemoryManage
       FileUtil.fullyDelete(procfsRootDir);
     }
   }
+
+  /**
+   * Test for verifying that tasks causing cumulative usage of physical memory
+   * to go beyond TT's limit get killed.
+   *
+   * @throws Exception
+   */
+  public void testTasksCumulativelyExceedingTTPhysicalLimits()
+      throws Exception {
+
+    // Run the test only if memory management is enabled
+    if (!isProcfsBasedTreeAvailable()) {
+      return;
+    }
+
+    // Start cluster with proper configuration.
+    JobConf fConf = new JobConf();
+
+    // very small value, so that no task escapes to successful completion.
+    fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
+        String.valueOf(300));
+    
+    // reserve all memory on TT so that the job will exceed memory limits
+    LinuxResourceCalculatorPlugin memoryCalculatorPlugin =
+            new LinuxResourceCalculatorPlugin();
+    long totalPhysicalMemory = memoryCalculatorPlugin.getPhysicalMemorySize();
+    long reservedPhysicalMemory = totalPhysicalMemory / (1024 * 1024) + 1;
+    fConf.setLong(TTConfig.TT_RESERVED_PHYSCIALMEMORY_MB,
+                  reservedPhysicalMemory);
+    long maxRssMemoryAllowedForAllTasks = totalPhysicalMemory -
+                                          reservedPhysicalMemory * 1024 * 1024L;
+    Pattern physicalMemoryOverLimitPattern = Pattern.compile(
+        "Killing one of the memory-consuming tasks - .*"
+          + ", as the cumulative RSS memory usage of all the tasks on "
+          + "the TaskTracker exceeds physical memory limit "
+          + maxRssMemoryAllowedForAllTasks + ".");
+
+    startCluster(fConf);
+    Matcher mat = null;
+
+    // Set up job.
+    JobConf conf = new JobConf(miniMRCluster.createJobConf());
+    // Set per task physical memory limits to be a higher value
+    conf.setLong(JobContext.MAP_MEMORY_PHYSICAL_MB, 2 * 1024L);
+    conf.setLong(JobContext.REDUCE_MEMORY_PHYSICAL_MB, 2 * 1024L);
+    JobClient jClient = new JobClient(conf);
+    SleepJob sleepJob = new SleepJob();
+    sleepJob.setConf(conf);
+    // Start the job
+    Job job = sleepJob.createJob(1, 1, 100000, 1, 100000, 1);
+    job.submit();
+    boolean TTOverFlowMsgPresent = false;
+    while (true) {
+      List<TaskReport> allTaskReports = new ArrayList<TaskReport>();
+      allTaskReports.addAll(Arrays.asList(jClient
+          .getSetupTaskReports((org.apache.hadoop.mapred.JobID) job.getID())));
+      allTaskReports.addAll(Arrays.asList(jClient
+          .getMapTaskReports((org.apache.hadoop.mapred.JobID) job.getID())));
+      for (TaskReport tr : allTaskReports) {
+        String[] diag = tr.getDiagnostics();
+        for (String str : diag) {
+          mat = physicalMemoryOverLimitPattern.matcher(str);
+          if (mat.find()) {
+            TTOverFlowMsgPresent = true;
+          }
+        }
+      }
+      if (TTOverFlowMsgPresent) {
+        break;
+      }
+      assertFalse("Job should not finish successfully", job.isSuccessful());
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // nothing
+      }
+    }
+    // If it comes here without a test-timeout, it means there was a task that
+    // was killed because of crossing cumulative TT limit.
+
+    // Test succeeded, kill the job.
+    job.killJob();
+  }
 }



Mime
View raw message