hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1079194 - in /hadoop/mapreduce/branches/yahoo-merge/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapreduce/jobhistory/
Date Tue, 08 Mar 2011 05:54:21 GMT
Author: omalley
Date: Tue Mar  8 05:54:20 2011
New Revision: 1079194

URL: http://svn.apache.org/viewvc?rev=1079194&view=rev
Log:
commit e808cce90f4cbf79321d7f6c06084fa88e102c37
Author: Greg Roelofs <roelofs@yahoo-inc.com>
Date:   Wed Dec 1 19:57:04 2010 -0800

    "Final," ready-for-checkin (I think), v6 patch.  This one includes the
    memory criterion in the uber-decision, as well as corresponding cleanup,
    and removal of all the "GRR" comments.

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JSPUtil.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java
Tue Mar  8 05:54:20 2011
@@ -292,10 +292,10 @@ class JobQueue extends AbstractQueue {
 
 
   /**
-   * Setup {@link CapacityTaskScheduler} specific information prior to
+   * Set up {@link CapacityTaskScheduler}-specific information prior to
    * job initialization.
    * <p/>
-   * TO DO: Currently this method uses , CapacityTaskScheduler based variables
+   * TO DO: Currently this method uses CapacityTaskScheduler-based variables;
    * need to shift those.
    */
   void preInitializeJob(JobInProgress job) {
@@ -306,9 +306,8 @@ class JobQueue extends AbstractQueue {
     int slotsPerReduce = 1;
     if (MemoryMatcher.isSchedulingBasedOnMemEnabled()) {
       slotsPerMap = jobConf.computeNumSlotsPerMap(
-        MemoryMatcher.getMemSizeForMapSlot());
-      slotsPerReduce =
-        jobConf.computeNumSlotsPerReduce(
+          MemoryMatcher.getMemSizeForMapSlot());
+      slotsPerReduce = jobConf.computeNumSlotsPerReduce(
           MemoryMatcher.getMemSizeForReduceSlot());
     }
     job.setNumSlotsPerMap(slotsPerMap);

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
Tue Mar  8 05:54:20 2011
@@ -28,13 +28,20 @@ import org.apache.hadoop.conf.Configurat
 class MemoryMatcher {
 
   private static final Log LOG = LogFactory.getLog(MemoryMatcher.class);
-  static long memSizeForMapSlotOnJT = JobConf.DISABLED_MEMORY_LIMIT;
-  static long memSizeForReduceSlotOnJT = JobConf.DISABLED_MEMORY_LIMIT;
-  static long limitMaxMemForMapTasks = JobConf.DISABLED_MEMORY_LIMIT;
-  static long limitMaxMemForReduceTasks = JobConf.DISABLED_MEMORY_LIMIT;
+  private static JobTracker.MemoryLimits schedulerMemLimits =
+      new JobTracker.MemoryLimits();
 
 
   public MemoryMatcher() {
+    // initialize memory limits using official JobTracker values
+    schedulerMemLimits.setMemSizeForMapSlot(
+        JobTracker.getMemSizeForMapSlot());
+    schedulerMemLimits.setMemSizeForReduceSlot(
+        JobTracker.getMemSizeForReduceSlot());
+    schedulerMemLimits.setMaxMemForMapTasks(
+        JobTracker.getLimitMaxMemForMapTasks());
+    schedulerMemLimits.setMaxMemForReduceTasks(
+        JobTracker.getLimitMaxMemForReduceTasks());
   }
 
   /**
@@ -133,17 +140,7 @@ class MemoryMatcher {
   }
 
   static boolean isSchedulingBasedOnMemEnabled() {
-    if (getLimitMaxMemForMapSlot()
-                                  == JobConf.DISABLED_MEMORY_LIMIT
-        || getLimitMaxMemForReduceSlot()
-                                  == JobConf.DISABLED_MEMORY_LIMIT
-        || getMemSizeForMapSlot()
-                                  == JobConf.DISABLED_MEMORY_LIMIT
-        || getMemSizeForReduceSlot()
-                                  == JobConf.DISABLED_MEMORY_LIMIT) {
-      return false;
-    }
-    return true;
+    return schedulerMemLimits.isMemoryConfigSet();
   }
 
   public static void initializeMemoryRelatedConf(Configuration conf) {
@@ -164,72 +161,36 @@ class MemoryMatcher {
           CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY));
     }
 
-    if (conf.get(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY) != null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY));
-    }
-
-    memSizeForMapSlotOnJT =
-        JobConf.normalizeMemoryConfigValue(conf.getLong(
-            MRConfig.MAPMEMORY_MB, JobConf.DISABLED_MEMORY_LIMIT));
-    memSizeForReduceSlotOnJT =
-        JobConf.normalizeMemoryConfigValue(conf.getLong(
-            MRConfig.REDUCEMEMORY_MB,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    //handling @deprecated values
-    if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)+
-          " instead use " + JTConfig.JT_MAX_MAPMEMORY_MB +
-          " and " + JTConfig.JT_MAX_REDUCEMEMORY_MB
-      );
-
-      limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
-        JobConf.normalizeMemoryConfigValue(
-          conf.getLong(
-            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-      if (limitMaxMemForMapTasks != JobConf.DISABLED_MEMORY_LIMIT &&
-        limitMaxMemForMapTasks >= 0) {
-        limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
-          limitMaxMemForMapTasks /
-            (1024 * 1024); //Converting old values in bytes to MB
-      }
-    } else {
-      limitMaxMemForMapTasks =
-        JobConf.normalizeMemoryConfigValue(
-          conf.getLong(
-            JTConfig.JT_MAX_MAPMEMORY_MB, JobConf.DISABLED_MEMORY_LIMIT));
-      limitMaxMemForReduceTasks =
-        JobConf.normalizeMemoryConfigValue(
-          conf.getLong(
-            JTConfig.JT_MAX_REDUCEMEMORY_MB, JobConf.DISABLED_MEMORY_LIMIT));
-    }
-    LOG.info(String.format("Scheduler configured with "
-        + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT, "
-        + "limitMaxMemForMapTasks, limitMaxMemForReduceTasks)"
-        + " (%d,%d,%d,%d)", Long.valueOf(memSizeForMapSlotOnJT), Long
-        .valueOf(memSizeForReduceSlotOnJT), Long
-        .valueOf(limitMaxMemForMapTasks), Long
-        .valueOf(limitMaxMemForReduceTasks)));
+    // Our local schedulerMemLimits instance and everything from here to the
+    // end of the file is really just for TestCapacityScheduler, which assumes
+    // it can set weird values and have them automatically reinitialized prior
+    // to the subsequent test.  Otherwise the official JobTracker versions
+    // would suffice.
+
+    JobTracker.initializeMemoryRelatedConfig(conf, schedulerMemLimits);
+
+    LOG.info(new StringBuilder().append("Scheduler configured with ")
+        .append("(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,")
+        .append(" limitMaxMemForMapTasks, limitMaxMemForReduceTasks) (")
+        .append(getMemSizeForMapSlot()).append(", ")
+        .append(getMemSizeForReduceSlot()).append(", ")
+        .append(getLimitMaxMemForMapSlot()).append(", ")
+        .append(getLimitMaxMemForReduceSlot()).append(")"));
   }
 
-  static long  getMemSizeForMapSlot() {
-    return memSizeForMapSlotOnJT;
+  static long getMemSizeForMapSlot() {
+    return schedulerMemLimits.getMemSizeForMapSlot();
   }
 
   static long getMemSizeForReduceSlot() {
-    return memSizeForReduceSlotOnJT;
+    return schedulerMemLimits.getMemSizeForReduceSlot();
   }
 
   static long getLimitMaxMemForMapSlot() {
-    return limitMaxMemForMapTasks;
+    return schedulerMemLimits.getMaxMemForMapTasks();
   }
 
   static long getLimitMaxMemForReduceSlot() {
-    return limitMaxMemForReduceTasks;
+    return schedulerMemLimits.getMaxMemForReduceTasks();
   }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JSPUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JSPUtil.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JSPUtil.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JSPUtil.java Tue
Mar  8 05:54:20 2011
@@ -258,13 +258,17 @@ class JSPUtil {
   /**
    * Method used to generate the Job table for Job pages.
    * 
-   * @param label display heading to be used in the job table. [GRR FIXME: not used for heading!
jobtracker.jsp handles separately--would be better to pass in enum]
+   * @param label display heading to be used in the job table.
    * @param jobs vector of jobs to be displayed in table.
    * @param refresh refresh interval to be used in jobdetails page.
    * @param rowId beginning row id to be used in the table.
    * @return
    * @throws IOException
    */
+  /*
+   * FIXME:  Label is not used for the table heading; jobtracker.jsp handles
+   *         that separately.  Would be better to pass in an enum.
+   */
   public static String generateJobTable(String label,
       Collection<JobInProgress> jobs, int refresh, int rowId, JobConf conf)
       throws IOException {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
Tue Mar  8 05:54:20 2011
@@ -513,17 +513,13 @@ public class JobInProgress {
                                               int maxCacheLevel) {
     Map<Node, List<TaskInProgress>> cache = 
       new IdentityHashMap<Node, List<TaskInProgress>>(maxCacheLevel);
-//GRR PERF FIXME:  wtf??  claiming hash size is maxCacheLevel == 2(!)
-//   potentially:  size == num splits * num split locations * numlevels (if every split location
is on a separate host)...hmmm, possible for same split location to show up in more than 1
level?
     
-//GRR Q:  what are "splits" in this context?  if text file, does every 10,000 lines (or whatever)
in same block constitute a split?
     for (int i = 0; i < splits.length; i++) {
       // originally UberTask was a flavor of MapTask (=> maps[0]), but the
       // current design makes it a flavor of ReduceTask instead (=> reduces[0])
       TaskInProgress map = uberMode? reduces[0] : maps[i];
       String[] splitLocations = splits[i].getLocations();
       if (splitLocations.length == 0) {
-//GRR Q:  don't understand this:  if given MapTask's split has no locations => non-local?
 why not fail immediately if no data?  or treat as local since doesn't matter where? (or do
we want to avoid contention for node that might have local data for some other map?)
         nonLocalMaps.add(map);
         continue;
       }
@@ -676,6 +672,7 @@ public class JobInProgress {
         Math.min(1, sysConf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1));
     long sysMaxBytes = sysConf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
         sysConf.getLong("dfs.block.size", 64*1024*1024));
+    long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot();
 
     // user has overall veto power over uberization, or user can set more
     // stringent limits than the system specifies, but user may not exceed
@@ -687,14 +684,15 @@ public class JobInProgress {
         && numReduceTasks <= Math.min(sysMaxReduces,
             conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, sysMaxReduces))
         && inputLength <= Math.min(sysMaxBytes,
-            conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, sysMaxBytes));
+            conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, sysMaxBytes))
+        // ignoring overhead due to UberTask and statics as negligible here:
+        && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
+            || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);
 
     if (uberMode) {
       // save internal details for UI
       uberMapTasks = numMapTasks;
       uberReduceTasks = numReduceTasks;
-      //GRR FIXME:  check jobSetupCleanupNeeded and set "uberSetupTasks" and
-      //            "uberCleanupTasks" for UI, too?
 
       // this method modifies numMapTasks (-> 0), numReduceTasks (-> 1), and
       // jobSetupCleanupNeeded (-> false)  [and actually creates a TIP, not a
@@ -752,7 +750,7 @@ public class JobInProgress {
 
     tasksInited.set(true);
 
-//GRR Q:  what's difference between profile.getJobID() and jobId ?
+    // possible FIXME:  if profile.getJobID() and jobId are same thing, pick one
     JobInitedEvent jie = new JobInitedEvent(
         profile.getJobID(), launchTime, numMapTasks, numReduceTasks,
         uberMode, uberMapTasks, uberReduceTasks,
@@ -858,7 +856,6 @@ public class JobInProgress {
                                     this, numSlotsPerReduce,
                                     jobSetupCleanupNeeded);
     nonRunningReduces.add(reduces[0]);  // note:  no map analogue
-//GRR FIXME?  any conditions under which numSlotsPerReduce would not be correct thing to
use for an UberTask (running in a reduce slot)?
     numMapTasks = 0;
     numReduceTasks = 1;
     jobSetupCleanupNeeded = false;
@@ -2078,7 +2075,6 @@ public class JobInProgress {
     if (nonRunningMapCache == null) {
       LOG.warn("Non-running cache for maps missing!! "
                + "Job details are missing.");
-//GRR Q:  why not throwing exception or otherwise killing self and/or job?  seems like logic
error...or is this expected sometimes?
       return;
     }
 
@@ -2300,7 +2296,6 @@ public class JobInProgress {
 
     // We fall to linear scan of the list (III above) if we have misses in the 
     // above caches
-//GRR FIXME?  within _this_ function?  not seeing it in the code...
 
     Node node = jobtracker.getNode(tts.getHost());
     
@@ -2320,7 +2315,7 @@ public class JobInProgress {
       // findNewMapTask is to schedule only off-switch/speculative tasks
       int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
       for (level = 0; level < maxLevelToSchedule; ++level) {
-        List <TaskInProgress> cacheForLevel /* a.k.a. hostMap */ = nonRunningMapCache.get(key);
+        List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
         if (cacheForLevel != null) {
           // this may remove a TIP from cacheForLevel (but not necessarily tip):
           tip = findTaskFromList(cacheForLevel, tts,
@@ -2330,7 +2325,6 @@ public class JobInProgress {
             scheduleMap(tip);
 
             // remove the cache if it's empty
-//GRR FIXME:  findTaskFromList() may return null even though it removed a tip => might
be empty anyway?  (seems like this if-block should move up)
             if (cacheForLevel.size() == 0) {
               nonRunningMapCache.remove(key);
             }
@@ -2345,8 +2339,6 @@ public class JobInProgress {
       if (level == maxCacheLevel) {
         return -1;
       }
-//GRR FIXME:  seems like check should be against maxLevelToSchedule:  if user specifies local
(maxLevel == 0 or 1) but TT specifies any (maxCacheLevel == maxLevel+1), then maxLevelToSchedule
== maxLevel, level == maxLevel == maxLevelToSchedule, and level != maxCacheLevel   => fall
through to non-local
-//            [can this happen, or must TT value match user's setting?  presumably can, else
would (should) use same variable for both...]
     }
 
     //2. Search breadth-wise across parents at max level for non-running 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java
Tue Mar  8 05:54:20 2011
@@ -1333,10 +1333,7 @@ public class JobTracker implements MRCon
 
   private final ACLsManager aclsManager;
 
-  long limitMaxMemForMapTasks;
-  long limitMaxMemForReduceTasks;
-  long memSizeForMapSlotOnJT;
-  long memSizeForReduceSlotOnJT;
+  private static MemoryLimits jtMemLimits = new MemoryLimits();
 
   private final QueueManager queueManager;
 
@@ -4370,23 +4367,76 @@ public class JobTracker implements MRCon
           .getQueueRefresher());
     }
   }
-  
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  static class MemoryLimits {
+    private long memSizeForMapSlot = JobConf.DISABLED_MEMORY_LIMIT;
+    private long memSizeForReduceSlot = JobConf.DISABLED_MEMORY_LIMIT;
+    private long maxMemForMapTasks = JobConf.DISABLED_MEMORY_LIMIT;
+    private long maxMemForReduceTasks = JobConf.DISABLED_MEMORY_LIMIT;
+
+    public long getMemSizeForMapSlot()    { return memSizeForMapSlot; }
+    public long getMemSizeForReduceSlot() { return memSizeForReduceSlot; }
+    public long getMaxMemForMapTasks()    { return maxMemForMapTasks; }
+    public long getMaxMemForReduceTasks() { return maxMemForReduceTasks; }
+
+    public void setMemSizeForMapSlot(long mapMemSize) {
+      memSizeForMapSlot = mapMemSize;
+    }
+    public void setMemSizeForReduceSlot(long reduceMemSize) {
+      memSizeForReduceSlot = reduceMemSize;
+    }
+    public void setMaxMemForMapTasks(long maxMapMem) {
+      maxMemForMapTasks = maxMapMem;
+    }
+    public void setMaxMemForReduceTasks(long maxReduceMem) {
+      maxMemForReduceTasks = maxReduceMem;
+    }
+
+    public boolean isMemoryConfigSet() {
+      return (getMemSizeForMapSlot() != JobConf.DISABLED_MEMORY_LIMIT
+              && getMemSizeForReduceSlot() != JobConf.DISABLED_MEMORY_LIMIT
+              && getMaxMemForMapTasks() != JobConf.DISABLED_MEMORY_LIMIT
+              && getMaxMemForReduceTasks() != JobConf.DISABLED_MEMORY_LIMIT);
+    }
+  }
+
   private void initializeTaskMemoryRelatedConfig() {
-    memSizeForMapSlotOnJT =
+    initializeMemoryRelatedConfig(conf, jtMemLimits);
+    LOG.info(new StringBuilder().append("JobTracker configured with ")
+        .append("(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,")
+        .append(" limitMaxMemForMapTasks, limitMaxMemForReduceTasks) (")
+        .append(jtMemLimits.getMemSizeForMapSlot()).append(", ")
+        .append(jtMemLimits.getMemSizeForReduceSlot()).append(", ")
+        .append(jtMemLimits.getMaxMemForMapTasks()).append(", ")
+        .append(jtMemLimits.getMaxMemForReduceTasks()).append(")"));
+  }
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static void initializeMemoryRelatedConfig(Configuration conf,
+                                                   MemoryLimits memLimits) {
+    memLimits.setMemSizeForMapSlot(
         JobConf.normalizeMemoryConfigValue(conf.getLong(
-            MAPMEMORY_MB,
-            JobConf.DISABLED_MEMORY_LIMIT));
-    memSizeForReduceSlotOnJT =
+            MRConfig.MAPMEMORY_MB, JobConf.DISABLED_MEMORY_LIMIT)));
+    memLimits.setMemSizeForReduceSlot(
         JobConf.normalizeMemoryConfigValue(conf.getLong(
-            REDUCEMEMORY_MB,
-            JobConf.DISABLED_MEMORY_LIMIT));
+            MRConfig.REDUCEMEMORY_MB, JobConf.DISABLED_MEMORY_LIMIT)));
+
+    // handle @deprecated values
+    if (conf.get(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY));
+    }
+
+    long limitMaxMemForMapTasks, limitMaxMemForReduceTasks;
 
     if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) {
       LOG.warn(
-        JobConf.deprecatedString(
-          JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)+
-          " instead use "+JTConfig.JT_MAX_MAPMEMORY_MB+
-          " and " + JTConfig.JT_MAX_REDUCEMEMORY_MB
+        JobConf.deprecatedString(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)
+        + " instead use " + JTConfig.JT_MAX_MAPMEMORY_MB
+        + " and " + JTConfig.JT_MAX_REDUCEMEMORY_MB
       );
 
       limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
@@ -4413,12 +4463,32 @@ public class JobTracker implements MRCon
             JobConf.DISABLED_MEMORY_LIMIT));
     }
 
-    LOG.info(new StringBuilder().append("Scheduler configured with ").append(
-        "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append(
-        " limitMaxMemForMapTasks, limitMaxMemForReduceTasks) (").append(
-        memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT)
-        .append(", ").append(limitMaxMemForMapTasks).append(", ").append(
-            limitMaxMemForReduceTasks).append(")"));
+    memLimits.setMaxMemForMapTasks(limitMaxMemForMapTasks);
+    memLimits.setMaxMemForReduceTasks(limitMaxMemForReduceTasks);
+  }
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  static public long getMemSizeForMapSlot() {
+    return jtMemLimits.getMemSizeForMapSlot();
+  }
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  static public long getMemSizeForReduceSlot() {
+    return jtMemLimits.getMemSizeForReduceSlot();
+  }
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  static public long getLimitMaxMemForMapTasks() {
+    return jtMemLimits.getMaxMemForMapTasks();
+  }
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  static public long getLimitMaxMemForReduceTasks() {
+    return jtMemLimits.getMaxMemForReduceTasks();
   }
 
   @Override
@@ -4435,16 +4505,6 @@ public class JobTracker implements MRCon
     
     Groups.getUserToGroupsMappingService().refresh();
   }
-  
-  private boolean perTaskMemoryConfigurationSetOnJT() {
-    if (limitMaxMemForMapTasks == JobConf.DISABLED_MEMORY_LIMIT
-        || limitMaxMemForReduceTasks == JobConf.DISABLED_MEMORY_LIMIT
-        || memSizeForMapSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT
-        || memSizeForReduceSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT) {
-      return false;
-    }
-    return true;
-  }
 
   /**
    * Check the job if it has invalid requirements and throw and IOException if does have.
@@ -4454,7 +4514,7 @@ public class JobTracker implements MRCon
    */
   void checkMemoryRequirements(JobInProgress job)
       throws IOException {
-    if (!perTaskMemoryConfigurationSetOnJT()) {
+    if (!jtMemLimits.isMemoryConfigSet()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Per-Task memory configuration is not set on JT. "
                   + "Not checking the job for invalid memory requirements.");
@@ -4473,8 +4533,8 @@ public class JobTracker implements MRCon
       msg = "Invalid job requirements.";
     }
 
-    if (maxMemForMapTask > limitMaxMemForMapTasks
-        || maxMemForReduceTask > limitMaxMemForReduceTasks) {
+    if (maxMemForMapTask > getLimitMaxMemForMapTasks()
+        || maxMemForReduceTask > getLimitMaxMemForReduceTasks()) {
       invalidJob = true;
       msg = "Exceeds the cluster's max-memory-limit.";
     }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java Tue
Mar  8 05:54:20 2011
@@ -322,7 +322,7 @@ class MapTask extends Task {
     throws IOException, ClassNotFoundException, InterruptedException {
     this.umbilical = umbilical;
 
-    if (isMapTask()) {   //GRR Q: why is this conditional here? ALWAYS true (unless someone
derives from MapTask and overrides isMapTask() but not run())
+    if (isMapTask()) {
       mapPhase = getProgress().addPhase("map", 0.667f);
       sortPhase = getProgress().addPhase("sort", 0.333f);
     }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue
Mar  8 05:54:20 2011
@@ -474,7 +474,7 @@ abstract public class Task implements Wr
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 3000;
 
-  private transient Progress taskProgress = new Progress(); //GRR Q:  why transient?  lose
entire tree every time serialize??
+  private transient Progress taskProgress = new Progress();
 
   // Current counters
   private transient Counters counters = new Counters();
@@ -930,7 +930,7 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        //GRR FIXME (later):  alternatives to taskIdForUmbilical would be
+        // FIXME (later):  alternatives to taskIdForUmbilical would be
         // (1) include taskId as part of umbilical object and protocol;
         // (2) include taskId as part of taskStatus
         // (3) extend TaskAttemptID (or create related Task inner class?) to

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
Tue Mar  8 05:54:20 2011
@@ -969,8 +969,9 @@ class TaskInProgress {
    * Get the split locations
    */
   public String[] getSplitLocations() {
-//GRR FIXME?  may need to add "(  ..  || isUberTask())" if ever called for uber (but locations
for which split?  all of them?)
-    if (isMapTask() && !jobSetup && !jobCleanup) {
+    // last condition is redundant, but wanted to call out explicitly that
+    // UberTasks don't expose split locations even if they contain MapTasks:
+    if (isMapTask() && !jobSetup && !jobCleanup && !isUberTask())
{
       return splitInfo[0].getLocations();
     }
     return new String[0];
@@ -1236,8 +1237,6 @@ class TaskInProgress {
     return t;
   }
 
-  // GRR FIXME?  more efficient just to pass splitInfo directly...any need for
-  //             rest of it in UberTask?
   TaskSplitIndex[] getSplitIndexArray() {
     int numSplits = splitInfo.length;
     TaskSplitIndex[] splitIndex = new TaskSplitIndex[numSplits];

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
Tue Mar  8 05:54:20 2011
@@ -544,9 +544,9 @@ public abstract class TaskStatus impleme
   
   static void writeTaskStatus(DataOutput out, TaskStatus taskStatus) 
   throws IOException {
-/* LATER
- *  //GRR FIXME:  longer-term, just store tsType as member var (but then need
- *  //            to modify or add new ctor:  used in many places)
+/*
+ *  // FIXME (LATER):  just store tsType as member var (but then need
+ *  //                 to modify or add new ctor:  used in many places)
  *  Type tsType = taskStatus.getIsUber()
  *      ? TaskStatus.Type.UBER
  *      : taskStatus.getIsMap()

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
Tue Mar  8 05:54:20 2011
@@ -3212,7 +3212,7 @@ public class TaskTracker 
     
     LOG.debug("JVM with ID : " + jvmId + " asked for a task");
     if (!jvmManager.isJvmKnown(jvmId)) {
-      LOG.info("Killing unknown JVM " + jvmId); //GRR FIXME:  bug?  no (apparent) killing
going on here...
+      LOG.info("Killing unknown JVM " + jvmId); // FIXME:  cut-and-paste bug? (no apparent
killing going on here... maybe "Ignoring unknown JVM ID"?)
       return new JvmTask(null, true);
     }
     RunningJob rjob = runningJobs.get(jvmId.getJobId());

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
Tue Mar  8 05:54:20 2011
@@ -213,7 +213,6 @@ class UberTask extends Task {
     return mapIds;
   }
 
-//GRR PERF TODO: make sure not crossing disk boundaries (optimization: just renaming map
outputs to reduce inputs without doing in-memory/disk-spill shuffle thing)
   private void renameMapOutputForReduce(TaskAttemptID mapId,
                                         MapOutputFile subMapOutputFile)
   throws IOException {
@@ -299,7 +298,8 @@ class UberTask extends Task {
       reporter.progress();
 
       // every map will produce file.out (in the same dir), so rename as we go
-      if (numReduceTasks > 0) {  //GRR FIXME:  is conditionalized approach a behavior
change?  do map-only jobs produce file.out or map_#.out or nothing at all?  if not renamed
here, do we suddenly lose data that we used to preserve?
+      // (longer-term, will use TaskAttemptIDs as part of name => avoid rename)
+      if (numReduceTasks > 0) {
         renameMapOutputForReduce(mapIds[j], map.getMapOutputFile());
       }
     }
@@ -391,7 +391,7 @@ class UberTask extends Task {
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    if (isMapOrReduce()) {  //GRR Q:  why would this ever NOT be true?
+    if (isMapOrReduce()) {
       out.writeBoolean(jobSetupCleanupNeeded);
       WritableUtils.writeVInt(out, numMapTasks);
       WritableUtils.writeVInt(out, numReduceTasks);
@@ -405,7 +405,7 @@ class UberTask extends Task {
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    if (isMapOrReduce()) {  //GRR Q:  why would this ever NOT be true?
+    if (isMapOrReduce()) {
       jobSetupCleanupNeeded = in.readBoolean();
       numMapTasks = WritableUtils.readVInt(in);
       numReduceTasks = WritableUtils.readVInt(in);

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java
Tue Mar  8 05:54:20 2011
@@ -50,7 +50,8 @@ public class UberTaskRunner extends Task
 
   @Override
   public Level getLogLevel(JobConf jobConf) {
-    //GRR FIXME?  what if        !=  JobConf.MAPRED_MAP_TASK_LOG_LEVEL ?
+    // note that, if JobConf.MAPRED_MAP_TASK_LOG_LEVEL isn't same as REDUCE
+    // setting, sub-MapTasks will "do the wrong thing" (should be very rare)
     return Level.toLevel(jobConf.get(JobConf.MAPRED_REDUCE_TASK_LOG_LEVEL,
                                      JobConf.DEFAULT_LOG_LEVEL.toString()));
   }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1079194&r1=1079193&r2=1079194&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
Tue Mar  8 05:54:20 2011
@@ -377,7 +377,6 @@ public class JobHistoryParser {
       System.out.println("PRIORITY: " + priority);
       System.out.println("TOTAL_MAPS: " + totalMaps);
       System.out.println("TOTAL_REDUCES: " + totalReduces);
-      //GRR FIXME:  add UBER_SUBMAPS and UBER_SUBREDUCES? (or only if isUber == true? coordinate
with TaskInfo printAll() changes)
       System.out.println("MAP_COUNTERS:" + mapCounters.toString());
       System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString());
       System.out.println("TOTAL_COUNTERS: " + totalCounters.toString());



Mime
View raw message