hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1079192 [1/2] - in /hadoop/mapreduce/branches/yahoo-merge/src: contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ contrib/mumak/src/java/org/apache/hadoop/mapred/ java/ ja...
Date Tue, 08 Mar 2011 05:54:04 GMT
Author: omalley
Date: Tue Mar  8 05:54:02 2011
New Revision: 1079192

URL: http://svn.apache.org/viewvc?rev=1079192&view=rev
Log:
commit d1002eb7950bedbfbf7c2b9ef73b7f742a0d6eae
Author: Greg Roelofs <roelofs@yahoo-inc.com>
Date:   Tue Nov 30 19:30:09 2010 -0800

    Revert "Complete MR-1220.v5 port from Apache trunk to current y-trunk."
    
    This reverts commit 3e09d84186c82241f08644c46756f14209c1b681.

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JSPUtil.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmTask.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskStatus.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
    hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp
    hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java Tue Mar  8 05:54:02 2011
@@ -499,17 +499,17 @@ public class CapacityTestUtils {
     boolean hasSpeculativeMap;
     boolean hasSpeculativeReduce;
 
-    FakeTaskInProgress(JobID jId, JobConf jobConf, Task t,
-                       boolean isMap, FakeJobInProgress job, 
-                       JobSplit.TaskSplitMetaInfo split) {
+    FakeTaskInProgress(
+      JobID jId, JobConf jobConf, Task t,
+      boolean isMap, FakeJobInProgress job, 
+      JobSplit.TaskSplitMetaInfo split) {
       super(jId, "", split, null, jobConf, job, 0, 1);
       this.isMap = isMap;
       this.fakeJob = job;
       activeTasks = new TreeMap<TaskAttemptID, String>();
       activeTasks.put(t.getTaskID(), "tt");
       // create a fake status for a task that is running for a bit
-      this.taskStatus = TaskStatus.createTaskStatus(
-          isMap? TaskStatus.Type.MAP : TaskStatus.Type.REDUCE);
+      this.taskStatus = TaskStatus.createTaskStatus(isMap);
       taskStatus.setProgress(0.5f);
       taskStatus.setRunState(TaskStatus.State.RUNNING);
       if (jobConf.getMapSpeculativeExecution()) {
@@ -862,11 +862,6 @@ public class CapacityTestUtils {
         }
 
         @Override
-        public boolean getIsUber() {
-          return t.isUberTask();
-        }
-
-        @Override
         public int getNumSlots() {
           return t.getNumSlotsRequired();
         }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Tue Mar  8 05:54:02 2011
@@ -242,7 +242,7 @@ public class TestFairScheduler extends T
       this.fakeJob = job;
       this.inputLocations = inputLocations;
       activeTasks = new TreeMap<TaskAttemptID, String>();
-      taskStatus = TaskStatus.createTaskStatus(TaskStatus.Type.MAP);
+      taskStatus = TaskStatus.createTaskStatus(isMap);
       taskStatus.setRunState(TaskStatus.State.UNASSIGNED);
     }
 
@@ -253,7 +253,7 @@ public class TestFairScheduler extends T
       this.isMap = false;
       this.fakeJob = job;
       activeTasks = new TreeMap<TaskAttemptID, String>();
-      taskStatus = TaskStatus.createTaskStatus(TaskStatus.Type.REDUCE);
+      taskStatus = TaskStatus.createTaskStatus(isMap);
       taskStatus.setRunState(TaskStatus.State.UNASSIGNED);
     }
     

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java Tue Mar  8 05:54:02 2011
@@ -130,7 +130,8 @@ public class SimulatorJobInProgress exte
     createMapTasks(jobFile, taskSplitMetaInfo);
 
     if (numMapTasks > 0) {
-      nonRunningMapCache = createCache(taskSplitMetaInfo, maxLevel);
+      nonRunningMapCache = createCache(taskSplitMetaInfo,
+          maxLevel);
       if (loggingEnabled) {
         LOG.debug("initTasks:numMaps=" + numMapTasks
             + " Size of nonRunningMapCache=" + nonRunningMapCache.size()

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml Tue Mar  8 05:54:02 2011
@@ -556,46 +556,6 @@
 </property>
 
 <property>
-  <name>mapreduce.job.ubertask.enable</name>
-  <value>true</value>
-  <description>Whether to enable the small-jobs "ubertask" optimization,
-  which runs "sufficiently small" jobs sequentially within a single JVM.
-  "Small" is defined by the following maxmaps, maxreduces, and maxbytes
-  settings.  Users may override this value.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.job.ubertask.maxmaps</name>
-  <value>9</value>
-  <description>Threshold for number of maps, beyond which job is considered
-  too big for the ubertasking optimization.  Users may override this value,
-  but only downward.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.job.ubertask.maxreduces</name>
-  <value>1</value>
-  <description>Threshold for number of reduces, beyond which job is considered
-  too big for the ubertasking optimization.  CURRENTLY THE CODE CANNOT SUPPORT
-  MORE THAN ONE REDUCE and will ignore larger values.  (Zero is a valid max,
-  however.)  Users may override this value, but only downward.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.job.ubertask.maxbytes</name>
-  <value></value>
-  <description>Threshold for number of input bytes, beyond which job is
-  considered too big for the ubertasking optimization.  If no value is
-  specified, dfs.block.size is used as a default.  Be sure to specify a
-  default value in mapred-site.xml if the underlying filesystem is not HDFS.
-  Users may override this value, but only downward.
-  </description>
-</property>
-
-<property>
   <name>mapreduce.input.fileinputformat.split.minsize</name>
   <value>0</value>
   <description>The minimum size chunk that map input should be split

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JSPUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JSPUtil.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JSPUtil.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JSPUtil.java Tue Mar  8 05:54:02 2011
@@ -57,10 +57,6 @@ class JSPUtil {
 
   private static final Log LOG = LogFactory.getLog(JSPUtil.class);
 
-  private static final String RED    = "#ffddcc";  // pale red/pink
-  private static final String YELLOW = "#ffffcc";  // pale yellow
-  private static final String GREEN  = "#ddffcc";  // pale green
-
   /**
    * Wraps the {@link JobInProgress} object and contains boolean for
    * 'job view access' allowed or not.
@@ -258,32 +254,21 @@ class JSPUtil {
   /**
    * Method used to generate the Job table for Job pages.
    * 
-   * @param label display heading to be used in the job table. [GRR FIXME: not used for heading! jobtracker.jsp handles separately--would be better to pass in enum]
+   * @param label display heading to be used in the job table.
    * @param jobs vector of jobs to be displayed in table.
    * @param refresh refresh interval to be used in jobdetails page.
    * @param rowId beginning row id to be used in the table.
    * @return
    * @throws IOException
    */
-  public static String generateJobTable(String label,
-      Collection<JobInProgress> jobs, int refresh, int rowId, JobConf conf)
-      throws IOException {
+  public static String generateJobTable(String label, Collection<JobInProgress> jobs
+      , int refresh, int rowId, JobConf conf) throws IOException {
 
-    boolean isRunning = label.equals("Running");
-    boolean isModifiable = isRunning && privateActionsAllowed(conf);
+    boolean isModifiable = label.equals("Running") &&
+        privateActionsAllowed(conf);
     StringBuilder sb = new StringBuilder();
-    String bgColor;
-
-    if (isRunning) {
-      bgColor = YELLOW;
-    } else if (label.equals("Failed")) {
-      bgColor = RED;
-    } else /* if (label.equals("Completed")) */ {
-      bgColor = GREEN;
-    }
-
-    sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\" "
-              + "bgcolor=\"" + bgColor + "\">\n");
+    
+    sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
 
     if (jobs.size() > 0) {
       if (isModifiable) {
@@ -338,7 +323,7 @@ class JSPUtil {
         String name = HtmlQuoting.quoteHtmlChars(profile.getJobName());
         String jobpri = job.getPriority().toString();
         String schedulingInfo =
-          HtmlQuoting.quoteHtmlChars(status.getSchedulingInfo());
+          HtmlQuoting.quoteHtmlChars(job.getStatus().getSchedulingInfo());
 
         if (isModifiable) {
           sb.append("<tr><td><input TYPE=\"checkbox\" " +
@@ -406,19 +391,7 @@ class JSPUtil {
       sb.append("</tr>\n");
       for (int i = 0; i < 100 && iterator.hasNext(); i++) {
         JobStatus status = iterator.next();
-        String runState = JobStatus.getJobRunState(status.getRunState());
-        String bgColor = null;
-
-        if (runState.equals("SUCCEEDED")) {  // most common state
-          bgColor = GREEN;
-        } else if (runState.equals("FAILED") || runState.equals("KILLED")) {
-          bgColor = RED;
-        } // else leave white (shouldn't exist:  UNKNOWN, RUNNING, PREP)
-        if (bgColor != null) {
-          sb.append("<tr bgcolor=\"" + bgColor + "\">\n");
-        } else {
-          sb.append("<tr>");
-        }
+        sb.append("<tr>");
         sb.append(
             "<td id=\"job_" + rowId + "\">" + 
               "<a href=\"jobdetailshistory.jsp?logFile=" + 
@@ -430,7 +403,7 @@ class JSPUtil {
               status.getJobPriority().toString() + "</td>" +
             "<td id=\"user_" + rowId + "\">" + HtmlQuoting.quoteHtmlChars(status.getUsername()) + "</td>" +
             "<td id=\"name_" + rowId + "\">" + HtmlQuoting.quoteHtmlChars(status.getJobName()) + "</td>" +
-            "<td>" + runState + "</td>" +
+            "<td>" + JobStatus.getJobRunState(status.getRunState()) + "</td>" +
             "<td>" + new Date(status.getStartTime()) + "</td>" +
             "<td>" + new Date(status.getFinishTime()) + "</td>" +
             
@@ -530,7 +503,7 @@ class JSPUtil {
           jobHistoryCache.entrySet().iterator();
         String removeJobId = it.next().getKey();
         it.remove();
-        LOG.info("Job History file removed from cache "+removeJobId);
+        LOG.info("Job History file removed form cache "+removeJobId);
       }
     }
 
@@ -556,7 +529,8 @@ class JSPUtil {
       HttpServletResponse response, final JobTracker jobTracker,
       final FileSystem fs, final Path logFile) throws IOException,
       InterruptedException, ServletException {
-    String jobid = JobHistory.getJobIDFromHistoryFilePath(logFile).toString();
+    String jobid =
+        JobHistory.getJobIDFromHistoryFilePath(logFile).toString();
     String user = request.getRemoteUser();
 
     JobInfo jobInfo = JSPUtil.getJobInfo(logFile, fs, jobTracker);

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobConf.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobConf.java Tue Mar  8 05:54:02 2011
@@ -239,32 +239,11 @@ public class JobConf extends Configurati
    */
   public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
     JobContext.REDUCE_JAVA_OPTS;
-
-  /**
-   * Configuration key to set the Java command-line options for the 'uber'
-   * tasks.
-   *
-   * Java opts for the task tracker "uber"/combo map+reduce processes.
-   * The following symbol, if present, will be interpolated: @taskid@.
-   * It is replaced by current TaskID. Any other occurrences of '@' will go
-   * unchanged.
-   * For example, to enable verbose gc logging to a file named for the taskid
-   * in /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
-   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
-   *
-   * The configuration variable {@link #MAPRED_UBER_TASK_ULIMIT} can be used
-   * to control the maximum virtual memory of the map processes.
-   *
-   * The configuration variable {@link #MAPRED_UBER_TASK_ENV} can be used to
-   * pass other environment variables to the map processes.
-   */
-  public static final String MAPRED_UBER_TASK_JAVA_OPTS =
-    JobContext.UBERTASK_JAVA_OPTS;
-
+  
   public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
   
   /**
-   * Configuration key to set the maximum virtual memory available to the child
+   * Configuration key to set the maximum virutal memory available to the child
    * map and reduce tasks (in kilo-bytes).
    * 
    * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
@@ -277,7 +256,7 @@ public class JobConf extends Configurati
   public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
 
   /**
-   * Configuration key to set the maximum virtual memory available to the
+   * Configuration key to set the maximum virutal memory available to the
    * map tasks (in kilo-bytes).
    * 
    * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
@@ -286,7 +265,7 @@ public class JobConf extends Configurati
   public static final String MAPRED_MAP_TASK_ULIMIT = JobContext.MAP_ULIMIT;
   
   /**
-   * Configuration key to set the maximum virtual memory available to the
+   * Configuration key to set the maximum virutal memory available to the
    * reduce tasks (in kilo-bytes).
    * 
    * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
@@ -297,16 +276,6 @@ public class JobConf extends Configurati
 
 
   /**
-   * Configuration key to set the maximum virtual memory available to the
-   * 'uber' task (in kilobytes).
-   *
-   * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
-   *       via {@link #MAPRED_MAP_TASK_JAVA_OPTS}, else the VM might not start.
-   */
-  public static final String MAPRED_UBER_TASK_ULIMIT =
-    JobContext.UBERTASK_ULIMIT;
-
-  /**
    * Configuration key to set the environment of the child map/reduce tasks.
    * 
    * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
@@ -325,7 +294,8 @@ public class JobConf extends Configurati
   public static final String MAPRED_TASK_ENV = "mapred.child.env";
 
   /**
-   * Configuration key to set the environment of the child map tasks.
+   * Configuration key to set the maximum virutal memory available to the
+   * map tasks.
    * 
    * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
    * reference existing environment variables via <code>$key</code>.
@@ -339,7 +309,8 @@ public class JobConf extends Configurati
   public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
   
   /**
-   * Configuration key to set the environment of the child reduce tasks.
+   * Configuration key to set the maximum virutal memory available to the
+   * reduce tasks.
    * 
    * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
    * reference existing environment variables via <code>$key</code>.
@@ -376,22 +347,7 @@ public class JobConf extends Configurati
    * Default logging level for map/reduce tasks.
    */
   public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
-
-  /**
-   * Configuration key to set the environment of the child 'uber' tasks.
-   *
-   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
-   * reference existing environment variables via <code>$key</code>.
-   *
-   * Example:
-   * <ul>
-   *   <li> A=foo - This will set the env variable A to foo. </li>
-   *   <li> B=$X:c - This will inherit tasktracker's X env variable. </li>
-   * </ul>
-   */
-  public static final String MAPRED_UBER_TASK_ENV =
-    JobContext.UBERTASK_ENV;
-
+  
   /**
    * Construct a map/reduce job configuration.
    */

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Mar  8 05:54:02 2011
@@ -136,11 +136,7 @@ public class JobInProgress {
   // speculative tasks separately 
   int speculativeMapTasks = 0;
   int speculativeReduceTasks = 0;
-
-  // uber internals tracked here to avoid need for new TIP get-accessors
-  private int uberMapTasks = 0;
-  private int uberReduceTasks = 0;
-
+  
   int mapFailuresPercent = 0;
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
@@ -149,8 +145,7 @@ public class JobInProgress {
   private volatile boolean launchedSetup = false;
   private volatile boolean jobKilled = false;
   private volatile boolean jobFailed = false;
-  private /* final */ boolean jobSetupCleanupNeeded;
-  private boolean uberMode = false;
+  private final boolean jobSetupCleanupNeeded;
 
   JobPriority priority = JobPriority.NORMAL;
   protected JobTracker jobtracker;
@@ -476,6 +471,11 @@ public class JobInProgress {
       this.submitHostName = conf.getJobSubmitHostName();
       this.submitHostAddress = conf.getJobSubmitHostAddress();
 
+      this.nonLocalMaps = new LinkedList<TaskInProgress>();
+      this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+      this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+      this.nonRunningReduces = new LinkedList<TaskInProgress>();
+      this.runningReduces = new LinkedHashSet<TaskInProgress>();
       this.slowTaskThreshold = Math.max(0.0f, conf.getFloat(
           MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f));
       this.speculativeCap = conf.getFloat(MRJobConfig.SPECULATIVECAP, 0.1f);
@@ -503,49 +503,36 @@ public class JobInProgress {
       }
     }
   }
-
-  /**
-   * Build a data structure that inverts the usual TIP -&gt; host lookup:
-   * i.e., given a host (TaskTracker), now can find all of this job's TIPs
-   * that have one or more splits on that host.
-   */
-  Map<Node, List<TaskInProgress>> createCache(TaskSplitMetaInfo[] splits,
-                                              int maxCacheLevel) {
+  
+  Map<Node, List<TaskInProgress>> createCache(
+                         TaskSplitMetaInfo[] splits, int maxLevel) {
     Map<Node, List<TaskInProgress>> cache = 
-      new IdentityHashMap<Node, List<TaskInProgress>>(maxCacheLevel);
-//GRR PERF FIXME:  wtf??  claiming hash size is maxCacheLevel == 2(!)
-//   potentially:  size == num splits * num split locations * numlevels (if every split location is on a separate host)...hmmm, possible for same split location to show up in more than 1 level?
+      new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
     
-//GRR Q:  what are "splits" in this context?  if text file, does every 10,000 lines (or whatever) in same block constitute a split?
     for (int i = 0; i < splits.length; i++) {
-      // originally UberTask was a flavor of MapTask (=> maps[0]), but the
-      // current design makes it a flavor of ReduceTask instead (=> reduces[0])
-      TaskInProgress map = uberMode? reduces[0] : maps[i];
       String[] splitLocations = splits[i].getLocations();
       if (splitLocations.length == 0) {
-//GRR Q:  don't understand this:  if given MapTask's split has no locations => non-local?  why not fail immediately if no data?  or treat as local since doesn't matter where? (or do we want to avoid contention for node that might have local data for some other map?)
-        nonLocalMaps.add(map);
+        nonLocalMaps.add(maps[i]);
         continue;
       }
 
-      for (String host: splitLocations) {
+      for(String host: splitLocations) {
         Node node = jobtracker.resolveAndAddToTopology(host);
-        // node looks like "/default-rack/localhost"
-        LOG.info("tip:" + map.getTIPId() + " has split on node:" + node);
-        for (int j = 0; j < maxCacheLevel; j++) {
+        LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
+        for (int j = 0; j < maxLevel; j++) {
           List<TaskInProgress> hostMaps = cache.get(node);
           if (hostMaps == null) {
             hostMaps = new ArrayList<TaskInProgress>();
             cache.put(node, hostMaps);
-            hostMaps.add(map);
+            hostMaps.add(maps[i]);
           }
-          // Check whether the hostMaps already contains an entry for a TIP.
-          // This will be true for nodes that are racks and multiple nodes in
-          // the rack contain the input for a tip. Note that if it already
-          // exists in the hostMaps, it must be the last element there since
-          // we process one TIP at a time sequentially in the split-size order
-          if (hostMaps.get(hostMaps.size() - 1) != map) {
-            hostMaps.add(map);
+          //check whether the hostMaps already contains an entry for a TIP
+          //This will be true for nodes that are racks and multiple nodes in
+          //the rack contain the input for a tip. Note that if it already
+          //exists in the hostMaps, it must be the last element there since
+          //we process one TIP at a time sequentially in the split-size order
+          if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
+            hostMaps.add(maps[i]);
           }
           node = node.getParent();
         }
@@ -570,13 +557,6 @@ public class JobInProgress {
     return user;
   }
 
-  /**
-   * Was the job small enough to uberize?
-   */
-  public boolean getUberMode() {
-    return uberMode;
-  }
-
   boolean getMapSpeculativeExecution() {
     return hasSpeculativeMaps;
   }
@@ -666,65 +646,21 @@ public class JobInProgress {
 
     checkTaskLimits();
 
-    for (int i = 0; i < numMapTasks; ++i) {
-      inputLength += taskSplitMetaInfo[i].getInputDataLength();
-    }
-
-    JobConf sysConf = jobtracker.getConf();
-    int sysMaxMaps = sysConf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
-    int sysMaxReduces = // code doesn't support > 1 reduce, so force limit:
-        Math.min(1, sysConf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1));
-    long sysMaxBytes = sysConf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
-        sysConf.getLong("dfs.block.size", 64*1024*1024));
-
-    // user has overall veto power over uberization, or user can set more
-    // stringent limits than the system specifies, but user may not exceed
-    // system limits (for now, anyway)
-    uberMode = conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true)
-        && numMapTasks > 0  // temporary restriction until can test reduce-only
-        && numMapTasks <= Math.min(sysMaxMaps,
-            conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, sysMaxMaps))
-        && numReduceTasks <= Math.min(sysMaxReduces,
-            conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, sysMaxReduces))
-        && inputLength <= Math.min(sysMaxBytes,
-            conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, sysMaxBytes));
-
-    if (uberMode) {
-      // save internal details for UI
-      uberMapTasks = numMapTasks;
-      uberReduceTasks = numReduceTasks;
-      //GRR FIXME:  check jobSetupCleanupNeeded and set "uberSetupTasks" and
-      //            "uberCleanupTasks" for UI, too?
-
-      // this method modifies numMapTasks (-> 0), numReduceTasks (-> 1), and
-      // jobSetupCleanupNeeded (-> false)  [and actually creates a TIP, not a
-      // true Task; latter is created in TaskInProgress's addRunningTask()
-      // method]
-      createUberTask(jobFile.toString(), taskSplitMetaInfo);
-    }
-
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
 
-    if (!uberMode) {
-      createMapTasks(jobFile.toString(), taskSplitMetaInfo);
-    } else {
-      // must create this array even if zero elements:
-      maps = new TaskInProgress[numMapTasks];
-    }
-
-    if (numMapTasks > 0 || (uberMode && uberMapTasks > 0)) {
-      // this is needed even if all tasks are shielded by uber event horizon
-      nonRunningMapCache = createCache(taskSplitMetaInfo, maxLevel);
+    createMapTasks(jobFile.toString(), taskSplitMetaInfo);
+    
+    if (numMapTasks > 0) { 
+      nonRunningMapCache = createCache(taskSplitMetaInfo,
+          maxLevel);
     }
-
+        
     // set the launch time
-    launchTime = JobTracker.getClock().getTime();
-
-    if (!uberMode) {
-      createReduceTasks(jobFile.toString());
-    }
+    this.launchTime = JobTracker.getClock().getTime();
 
+    createReduceTasks(jobFile.toString());
+    
     // Calculate the minimum number of maps to be complete before 
     // we should start scheduling reduces
     completedMapsForReduceSlowstart = 
@@ -732,16 +668,9 @@ public class JobInProgress {
           (conf.getFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 
                          DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
            numMapTasks));
-
-    // Creates setup[] and cleanup[] arrays of TIPs, with TaskID indices above
-    // regular map and reduce values.  In uber mode, UberTask itself handles
-    // setup and cleanup duties, and jobSetupCleanupNeeded is forced false.
-    // But we skip initSetupCleanupTasks() call in that case anyway to avoid
-    // a potentially misleading log msg.
-    if (!uberMode) {
-      initSetupCleanupTasks(jobFile.toString());
-    }
-
+    
+    initSetupCleanupTasks(jobFile.toString());
+    
     synchronized(jobInitKillStatus){
       jobInitKillStatus.initDone = true;
       if(jobInitKillStatus.killed) {
@@ -749,25 +678,18 @@ public class JobInProgress {
         throw new KillInterruptedException("Job " + jobId + " killed in init");
       }
     }
-
+    
     tasksInited.set(true);
-
-//GRR Q:  what's difference between profile.getJobID() and jobId ?
     JobInitedEvent jie = new JobInitedEvent(
-        profile.getJobID(), launchTime, numMapTasks, numReduceTasks,
-        uberMode, uberMapTasks, uberReduceTasks,
+        profile.getJobID(),  this.launchTime,
+        numMapTasks, numReduceTasks,
         JobStatus.getJobRunState(JobStatus.PREP));
+    
     jobHistory.logEvent(jie, jobId);
-
+   
     // Log the number of map and reduce tasks
-    if (!uberMode) {
-      LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
-               + " map tasks and " + numReduceTasks + " reduce tasks.");
-    } else {
-      LOG.info("Job " + jobId + " initialized successfully with 1 UberTask "
-               + "(running as a ReduceTask) containing " + uberMapTasks
-               + " sub-MapTasks and " + uberReduceTasks + " sub-ReduceTasks.");
-    }
+    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks 
+             + " map tasks and " + numReduceTasks + " reduce tasks.");
   }
 
   // Returns true if the job is empty (0 maps, 0 reduces and no setup-cleanup)
@@ -828,9 +750,10 @@ public class JobInProgress {
   }
 
   synchronized void createMapTasks(String jobFile, 
-		                   TaskSplitMetaInfo[] splits) {
+		  TaskSplitMetaInfo[] splits) {
     maps = new TaskInProgress[numMapTasks];
-    for (int i=0; i < numMapTasks; ++i) {
+    for(int i=0; i < numMapTasks; ++i) {
+      inputLength += splits[i].getInputDataLength();
       maps[i] = new TaskInProgress(jobId, jobFile, 
                                    splits[i], 
                                    jobtracker, conf, this, 
@@ -838,6 +761,7 @@ public class JobInProgress {
     }
     LOG.info("Input size for job " + jobId + " = " + inputLength
         + ". Number of splits = " + splits.length);
+
   }
 
   synchronized void createReduceTasks(String jobFile) {
@@ -851,52 +775,38 @@ public class JobInProgress {
     }
   }
 
-  synchronized void createUberTask(String jobFile, TaskSplitMetaInfo[] splits) {
-    reduces = new TaskInProgress[1];
-    reduces[0] = new TaskInProgress(jobId, jobFile, splits, numMapTasks,
-                                    numReduceTasks, 0, jobtracker, conf,
-                                    this, numSlotsPerReduce,
-                                    jobSetupCleanupNeeded);
-    nonRunningReduces.add(reduces[0]);  // note:  no map analogue
-//GRR FIXME?  any conditions under which numSlotsPerReduce would not be correct thing to use for an UberTask (running in a reduce slot)?
-    numMapTasks = 0;
-    numReduceTasks = 1;
-    jobSetupCleanupNeeded = false;
-    LOG.info("Input size for job " + jobId + " = " + inputLength
-        + ". Number of splits = " + splits.length + ". UberTasking!");
-  }
-
+  
   synchronized void initSetupCleanupTasks(String jobFile) {
     if (!jobSetupCleanupNeeded) {
       LOG.info("Setup/Cleanup not needed for job " + jobId);
       // nothing to initialize
       return;
     }
-    // Create two cleanup tips, one map and one reduce.  Only one will be used.
+    // create cleanup two cleanup tips, one map and one reduce.
     cleanup = new TaskInProgress[2];
 
-    // Create cleanup map tip. This map doesn't use any splits; just assign an
-    // empty split.
+    // cleanup map tip. This map doesn't use any splits. Just assign an empty
+    // split.
     TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
     cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks, 1);
     cleanup[0].setJobCleanupTask();
 
-    // Create cleanup reduce tip.
+    // cleanup reduce tip.
     cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                        numReduceTasks, jobtracker, conf, this, 1);
     cleanup[1].setJobCleanupTask();
 
-    // Create two setup tips, one map and one reduce.  Only one will be used.
+    // create two setup tips, one map and one reduce.
     setup = new TaskInProgress[2];
 
-    // Create setup map tip. This map doesn't use any splits; just assign an
-    // empty split.
+    // setup map tip. This map doesn't use any split. Just assign an empty
+    // split.
     setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks + 1, 1);
     setup[0].setJobSetupTask();
 
-    // Create setup reduce tip.
+    // setup reduce tip.
     setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                        numReduceTasks + 1, jobtracker, conf, this, 1);
     setup[1].setJobSetupTask();
@@ -1051,7 +961,7 @@ public class JobInProgress {
       break;
       default:
       {
-        tasks = new TaskInProgress[0];
+          tasks = new TaskInProgress[0];
       }
       break;
     }
@@ -1068,7 +978,7 @@ public class JobInProgress {
   }
   
   /**
-   * Return the runningMapCache.  Used only by unit test(s).
+   * Return the runningMapCache
    * @return
    */
   Map<Node, Set<TaskInProgress>> getRunningMapCache()
@@ -1213,7 +1123,7 @@ public class JobInProgress {
         taskEvent = new TaskCompletionEvent(
                                             taskCompletionEventTracker, 
                                             taskid,
-                                            tip.getIdWithinJob(),
+                                            tip.idWithinJob(),
                                             status.getIsMap() &&
                                             !tip.isJobCleanupTask() &&
                                             !tip.isJobSetupTask(),
@@ -1269,7 +1179,7 @@ public class JobInProgress {
         }
         taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
                                             taskid,
-                                            tip.getIdWithinJob(),
+                                            tip.idWithinJob(),
                                             status.getIsMap() &&
                                             !tip.isJobCleanupTask() &&
                                             !tip.isJobSetupTask(),
@@ -1318,10 +1228,6 @@ public class JobInProgress {
         this.status.setReduceProgress((float) (this.status.reduceProgress() + 
                                            (progressDelta / reduces.length)));
       }
-      if (uberMode &&
-          (schedulingInfo == null || schedulingInfo.toString().equals(""))) {
-        setSchedulingInfo("");  // force method call so uber info will be added
-      }
     }
   }
 
@@ -1335,14 +1241,14 @@ public class JobInProgress {
   }
   
   /**
-   * Returns map-phase counters by summing over all map tasks in progress.
+   *  Returns map phase counters by summing over all map tasks in progress.
    */
   public synchronized Counters getMapCounters() {
     return incrementTaskCounters(new Counters(), maps);
   }
     
   /**
-   * Returns reduce-phase counters by summing over all reduce tasks in progress.
+   *  Returns map phase counters by summing over all map tasks in progress.
    */
   public synchronized Counters getReduceCounters() {
     return incrementTaskCounters(new Counters(), reduces);
@@ -1758,7 +1664,7 @@ public class JobInProgress {
     // when exactly to increment the locality counters. The only solution is to 
     // increment the counters for all the tasks irrespective of 
     //    - whether the tip is running or not
-    //    - whether it's a speculative task or not
+    //    - whether its a speculative task or not
     //
     // So to simplify, increment the data locality counter whenever there is 
     // data locality.
@@ -1800,7 +1706,7 @@ public class JobInProgress {
     
   public static String convertTrackerNameToHostName(String trackerName) {
     // Ugly!
-    // Convert the trackerName to its host name
+    // Convert the trackerName to it's host name
     int indexOfColon = trackerName.indexOf(":");
     String trackerHostName = (indexOfColon == -1) ? 
       trackerName : 
@@ -1930,10 +1836,9 @@ public class JobInProgress {
   }
     
   /**
-   * Get the blacklisted trackers for the (single) job.  Note that
-   * JobTracker's blacklist and graylist apply to all jobs.
+   * Get the black listed trackers for the job
    * 
-   * @return list of blacklisted tracker names for this job
+   * @return List of blacklisted tracker names
    */
   List<String> getBlackListedTrackers() {
     List<String> blackListedTrackers = new ArrayList<String>();
@@ -1946,19 +1851,19 @@ public class JobInProgress {
   }
   
   /**
-   * Get the number of 'flaky' tasktrackers for a given job.
+   * Get the no. of 'flaky' tasktrackers for a given job.
    * 
-   * @return the number of 'flaky' tasktrackers for a given job.
+   * @return the no. of 'flaky' tasktrackers for a given job.
    */
   int getNoOfBlackListedTrackers() {
     return flakyTaskTrackers;
   }
     
   /**
-   * Get the information on tasktrackers and number of errors which occurred
+   * Get the information on tasktrackers and no. of errors which occurred
    * on them for a given job. 
    * 
-   * @return the map of tasktrackers and number of errors which occurred
+   * @return the map of tasktrackers and no. of errors which occurred
    *         on them for a given job. 
    */
   synchronized Map<String, Integer> getTaskTrackerErrors() {
@@ -2040,7 +1945,7 @@ public class JobInProgress {
       return;
     }
 
-    for (String host: splitLocations) {
+    for(String host: splitLocations) {
       Node node = jobtracker.getNode(host);
 
       for (int j = 0; j < maxLevel; ++j) {
@@ -2078,11 +1983,10 @@ public class JobInProgress {
     if (nonRunningMapCache == null) {
       LOG.warn("Non-running cache for maps missing!! "
                + "Job details are missing.");
-//GRR Q:  why not throwing exception or otherwise killing self and/or job?  seems like logic error...or is this expected sometimes?
       return;
     }
 
-    // 1. It's added everywhere since other nodes (having this split local)
+    // 1. Its added everywhere since other nodes (having this split local)
     //    might have removed this tip from their local cache
     // 2. Give high priority to failed tip - fail early
 
@@ -2094,7 +1998,7 @@ public class JobInProgress {
       return;
     }
 
-    for (String host: splitLocations) {
+    for(String host: splitLocations) {
       Node node = jobtracker.getNode(host);
       
       for (int j = 0; j < maxLevel; ++j) {
@@ -2137,29 +2041,29 @@ public class JobInProgress {
     while (iter.hasNext()) {
       TaskInProgress tip = iter.next();
 
-      // Select a tip if all of the following are true:
+      // Select a tip if
       //   1. runnable   : still needs to be run and is not completed
-      //   2. !running   : no other node is running it
-      //   3. previous   : no earlier attempt has failed on this host, OR
-      //                   earlier attempts have failed on EVERY host
-      // A TIP is removed from the list if any of the following are true:
-      // (1) the tip is scheduled
-      // (2) the passed list is a level 0 (host) cache (removeFailedTip true)
-      // (3) the tip is non-schedulable (running, killed, complete)
+      //   2. ~running   : no other node is running it
+      //   3. earlier attempt failed : has not failed on this host
+      //                               and has failed on all the other hosts
+      // A TIP is removed from the list if 
+      // (1) this tip is scheduled
+      // (2) if the passed list is a level 0 (host) cache
+      // (3) when the TIP is non-schedulable (running, killed, complete)
       if (tip.isRunnable() && !tip.isRunning()) {
-        // check if the tip has failed on this host or if it has failed on
-        // all the nodes
+        // check if the tip has failed on this host
         if (!tip.hasFailedOnMachine(ttStatus.getHost()) || 
              tip.getNumberOfFailedMachines() >= numUniqueHosts) {
+          // check if the tip has failed on all the nodes
           iter.remove();
           return tip;
         } else if (removeFailedTip) { 
           // the case where we want to remove a failed tip from the host cache
-          // point#2 in the TIP-removal logic above
+          // point#3 in the TIP removal logic above
           iter.remove();
         }
       } else {
-        // see point#3 in the comment above for TIP-removal logic
+        // see point#3 in the comment above for TIP removal logic
         iter.remove();
       }
     }
@@ -2244,8 +2148,8 @@ public class JobInProgress {
    * @param maxCacheLevel The maximum topology level until which to schedule
    *                      maps. 
    *                      A value of {@link #anyCacheLevel} implies any 
-   *                      available task (node-local, rack-local, off-switch
-   *                      and speculative tasks).
+   *                      available task (node-local, rack-local, off-switch and 
+   *                      speculative tasks).
    *                      A value of {@link #NON_LOCAL_CACHE_LEVEL} implies only
    *                      off-switch/speculative tasks should be scheduled.
    * @return the index in tasks of the selected task (or -1 for no task)
@@ -2290,17 +2194,16 @@ public class JobInProgress {
     // For scheduling a map task, we have two caches and a list (optional)
     //  I)   one for non-running task
     //  II)  one for running task (this is for handling speculation)
-    //  III) a list of TIPs that have empty locations (e.g., dummy splits);
+    //  III) a list of TIPs that have empty locations (e.g., dummy splits),
     //       the list is empty if all TIPs have associated locations
 
-    // First a lookup is done on the non-running cache, and on a miss, a lookup
-    // is done on the running cache. The order for lookup within the cache:
+    // First a look up is done on the non-running cache and on a miss, a look 
+    // up is done on the running cache. The order for lookup within the cache:
     //   1. from local node to root [bottom up]
     //   2. breadth wise for all the parent nodes at max level
 
     // We fall to linear scan of the list (III above) if we have misses in the 
     // above caches
-//GRR FIXME?  within _this_ function?  not seeing it in the code...
 
     Node node = jobtracker.getNode(tts.getHost());
     
@@ -2309,28 +2212,27 @@ public class JobInProgress {
     // 
 
     // 1. check from local node to the root [bottom up cache lookup]
-    //    i.e., if the cache is available and the host has been resolved
+    //    i.e if the cache is available and the host has been resolved
     //    (node!=null)
     if (node != null) {
       Node key = node;
       int level = 0;
-      // maxCacheLevel might be greater than this.maxLevel if findNewMapTask
-      // is called to schedule any task (local, rack-local, off-switch or
-      // speculative), or it might be NON_LOCAL_CACHE_LEVEL (i.e., -1) if
-      // findNewMapTask is to schedule only off-switch/speculative tasks
+      // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
+      // called to schedule any task (local, rack-local, off-switch or speculative)
+      // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
+      //  (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
+      // tasks
       int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
-      for (level = 0; level < maxLevelToSchedule; ++level) {
-        List <TaskInProgress> cacheForLevel /* a.k.a. hostMap */ = nonRunningMapCache.get(key);
+      for (level = 0;level < maxLevelToSchedule; ++level) {
+        List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
         if (cacheForLevel != null) {
-          // this may remove a TIP from cacheForLevel (but not necessarily tip):
-          tip = findTaskFromList(cacheForLevel, tts,
-                                 numUniqueHosts, level == 0);
+          tip = findTaskFromList(cacheForLevel, tts, 
+              numUniqueHosts,level == 0);
           if (tip != null) {
             // Add to running cache
             scheduleMap(tip);
 
-            // remove the cache if it's empty
-//GRR FIXME:  findTaskFromList() may return null even though it removed a tip => might be empty anyway?  (seems like this if-block should move up)
+            // remove the cache if its empty
             if (cacheForLevel.size() == 0) {
               nonRunningMapCache.remove(key);
             }
@@ -2345,8 +2247,6 @@ public class JobInProgress {
       if (level == maxCacheLevel) {
         return -1;
       }
-//GRR FIXME:  seems like check should be against maxLevelToSchedule:  if user specifies local (maxLevel == 0 or 1) but TT specifies any (maxCacheLevel == maxLevel+1), then maxLevelToSchedule == maxLevel, level == maxLevel == maxLevelToSchedule, and level != maxCacheLevel   => fall through to non-local
-//            [can this happen, or must TT value match user's setting?  presumably can, else would (should) use same variable for both...]
     }
 
     //2. Search breadth-wise across parents at max level for non-running 
@@ -2406,7 +2306,7 @@ public class JobInProgress {
         return tip.getIdWithinJob();
       }
     }
-    return -1;
+   return -1;
   }
 
   private synchronized TaskInProgress getSpeculativeMap(String taskTrackerName, 
@@ -2923,7 +2823,7 @@ public class JobInProgress {
   }
 
   /**
-   * The job is done since all its component tasks are either
+   * The job is done since all it's component tasks are either
    * successful or have failed.
    */
   private void jobComplete() {
@@ -3098,7 +2998,7 @@ public class JobInProgress {
   }
 
   /**
-   * Kill the job and all its component tasks. This method should be called from
+   * Kill the job and all its component tasks. This method should be called from 
    * jobtracker and should return fast as it locks the jobtracker.
    */
   public void kill() {
@@ -3120,7 +3020,7 @@ public class JobInProgress {
    * Fails the job and all its component tasks. This should be called only from
    * {@link JobInProgress} or {@link JobTracker}. Look at 
    * {@link JobTracker#failJob(JobInProgress)} for more details.
-   * Note that the job doesn't expect itself to be failed before it's inited. 
+   * Note that the job doesnt expect itself to be failed before its inited. 
    * Only when the init is done (successfully or otherwise), the job can be 
    * failed. 
    */
@@ -3448,10 +3348,10 @@ public class JobInProgress {
        this.runningReduces = null;
 
      }
-     // remove job's delegation tokens
-     if (conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)) {
+     // remove jobs delegation tokens
+     if(conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)) {
        DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId);
-     } // else don't remove it. May be used by spawned tasks
+     } // else don't remove it.May be used by spawned tasks
    }
 
   /**
@@ -3563,19 +3463,8 @@ public class JobInProgress {
   }
   
   public synchronized void setSchedulingInfo(Object schedulingInfo) {
-    // UberTasking is a kind of scheduling decision, so we append it here
-    if (uberMode) {
-      StringBuilder sb = new StringBuilder(256);
-      if (schedulingInfo != null && !schedulingInfo.toString().equals("")) {
-        sb.append(schedulingInfo).append(" ");
-      }
-      sb.append("UberTask with ").append(uberMapTasks).append(" map and ")
-        .append(uberReduceTasks).append(" reduce subtasks.");
-      this.schedulingInfo = sb.toString();  // or could just use sb itself...
-    } else {
-      this.schedulingInfo = schedulingInfo;
-    }
-    this.status.setSchedulingInfo(this.schedulingInfo.toString());
+    this.schedulingInfo = schedulingInfo;
+    this.status.setSchedulingInfo(schedulingInfo.toString());
   }
   
   /**
@@ -3804,7 +3693,7 @@ public class JobInProgress {
     
     // write TokenStorage out
     tokenStorage.writeTokenStorageFile(keysFile, jobtracker.getConf());
-    LOG.info("jobToken generated and stored with user's keys in "
+    LOG.info("jobToken generated and stored with users keys in "
         + keysFile.toUri().getPath());
   }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Tue Mar  8 05:54:02 2011
@@ -185,8 +185,8 @@ class JobQueueTaskScheduler extends Task
             // Try all jobs again for the next Map task 
             break;
           }
-
-          // Try to schedule a non-local Map task
+          
+          // Try to schedule a node-local or rack-local Map task
           t = 
             job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                    taskTrackerManager.getNumberOfUniqueHosts());

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Mar  8 05:54:02 2011
@@ -2460,7 +2460,7 @@ public class JobTracker implements MRCon
     isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
     // Check for new tasks to be executed on the tasktracker
     if (acceptNewTasks && !isBlacklisted) {
-      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
+      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
       } else {
@@ -2493,7 +2493,10 @@ public class JobTracker implements MRCon
     }
 
     // Check for tasks whose outputs can be saved
-    actions.addAll(getTasksToSave(status));
+    List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
+    if (commitTasksList != null) {
+      actions.addAll(commitTasksList);
+    }
 
     // calculate next heartbeat interval and put in heartbeat response
     int nextInterval = getNextHeartbeatInterval();

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmTask.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmTask.java Tue Mar  8 05:54:02 2011
@@ -34,37 +34,27 @@ import org.apache.hadoop.io.Writable;
 public class JvmTask implements Writable {
   Task t;
   boolean shouldDie;
-
   public JvmTask(Task t, boolean shouldDie) {
     this.t = t;
     this.shouldDie = shouldDie;
   }
-
   public JvmTask() {}
-
   public Task getTask() {
     return t;
   }
-
   public boolean shouldDie() {
     return shouldDie;
   }
-
   public void write(DataOutput out) throws IOException {
     out.writeBoolean(shouldDie);
     if (t != null) {
       out.writeBoolean(true);
       out.writeBoolean(t.isMapTask());
-      if (!t.isMapTask()) {
-        // which kind of ReduceTask, uber or regular?
-        out.writeBoolean(t.isUberTask());
-      }
       t.write(out);
     } else {
       out.writeBoolean(false);
     }
   }
-
   public void readFields(DataInput in) throws IOException {
     shouldDie = in.readBoolean();
     boolean taskComing = in.readBoolean();
@@ -73,12 +63,7 @@ public class JvmTask implements Writable
       if (isMap) {
         t = new MapTask();
       } else {
-        boolean isUber = in.readBoolean();
-        if (isUber) {
-          t = new UberTask();
-        } else {
-          t = new ReduceTask();
-        }
+        t = new ReduceTask();
       }
       t.readFields(in);
     }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java Tue Mar  8 05:54:02 2011
@@ -45,24 +45,15 @@ class LaunchTaskAction extends TaskTrack
   
   public void write(DataOutput out) throws IOException {
     out.writeBoolean(task.isMapTask());
-    if (!task.isMapTask()) {
-      // which flavor of ReduceTask, uber or regular?
-      out.writeBoolean(task.isUberTask());
-    }
     task.write(out);
   }
-
+  
   public void readFields(DataInput in) throws IOException {
     boolean isMapTask = in.readBoolean();
     if (isMapTask) {
       task = new MapTask();
     } else {
-      boolean isUberTask = in.readBoolean();
-      if (isUberTask) {
-        task = new UberTask();
-      } else {
-        task = new ReduceTask();
-      }
+      task = new ReduceTask();
     }
     task.readFields(in);
   }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapOutputFile.java Tue Mar  8 05:54:02 2011
@@ -42,8 +42,6 @@ public class MapOutputFile {
 
   private JobConf conf;
 
-  static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
-  static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
   static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
 
   public MapOutputFile() {
@@ -61,7 +59,7 @@ public class MapOutputFile {
   public Path getOutputFile()
       throws IOException {
     return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING, conf);
+        + "file.out", conf);
   }
 
   /**
@@ -74,7 +72,7 @@ public class MapOutputFile {
   public Path getOutputFileForWrite(long size)
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING, size, conf);
+        + "file.out", size, conf);
   }
 
   /**
@@ -86,7 +84,7 @@ public class MapOutputFile {
   public Path getOutputIndexFile()
       throws IOException {
     return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING, conf);
+        + "file.out.index", conf);
   }
 
   /**
@@ -99,8 +97,7 @@ public class MapOutputFile {
   public Path getOutputIndexFileForWrite(long size)
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
-        size, conf);
+        + "file.out.index", size, conf);
   }
 
   /**

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java Tue Mar  8 05:54:02 2011
@@ -60,7 +60,9 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -89,13 +91,13 @@ class MapTask extends Task {
   private Progress mapPhase;
   private Progress sortPhase;
   
-  {
+  {   // set phase for this task
+    setPhase(TaskStatus.Phase.MAP); 
     getProgress().setStatus("map");
   }
 
   public MapTask() {
     super();
-    this.taskStatus = new MapTaskStatus();
   }
 
   public MapTask(String jobFile, TaskAttemptID taskId, 
@@ -103,9 +105,6 @@ class MapTask extends Task {
                  int numSlotsRequired) {
     super(jobFile, taskId, partition, numSlotsRequired);
     this.splitMetaInfo = splitIndex;
-    this.taskStatus = new MapTaskStatus(getTaskID(), 0.0f, numSlotsRequired,
-                                        TaskStatus.State.UNASSIGNED, "", "", "",
-                                        TaskStatus.Phase.MAP, getCounters());
   }
 
   @Override
@@ -113,22 +112,6 @@ class MapTask extends Task {
     return true;
   }
 
-  /**
-   * Is this really a combo-task masquerading as a plain MapTask?  Nope.
-   */
-  @Override
-  public boolean isUberTask() {
-    return false;
-  }
-
-  void createPhase(TaskStatus.Phase phaseType, String status, float weight) {
-    if (phaseType == TaskStatus.Phase.MAP) {
-      mapPhase = getProgress().addPhase(status, weight);
-    } else {
-      sortPhase = getProgress().addPhase(status, weight);
-    }
-  }
-
   @Override
   public void localizeConfiguration(JobConf conf)
       throws IOException {
@@ -177,12 +160,12 @@ class MapTask extends Task {
   }
 
   /**
-   * This class wraps the user's record reader to update the counters and
-   * progress as records are read.
+   * This class wraps the user's record reader to update the counters and progress
+   * as records are read.
    * @param <K>
    * @param <V>
    */
-  static class TrackedRecordReader<K, V>
+  class TrackedRecordReader<K, V> 
       implements RecordReader<K,V> {
     private RecordReader<K,V> rawIn;
     private Counters.Counter inputByteCounter;
@@ -246,25 +229,22 @@ class MapTask extends Task {
    * This class skips the records based on the failed ranges from previous 
    * attempts.
    */
-  static class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> {
-    private MapTask map;
+  class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> {
     private SkipRangeIterator skipIt;
     private SequenceFile.Writer skipWriter;
     private boolean toWriteSkipRecs;
     private TaskUmbilicalProtocol umbilical;
     private Counters.Counter skipRecCounter;
     private long recIndex = -1;
-
-    SkippingRecordReader(MapTask map, RecordReader<K,V> raw,
-                         TaskUmbilicalProtocol umbilical, TaskReporter reporter)
-    throws IOException {
+    
+    SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
+                         TaskReporter reporter) throws IOException{
       super(raw, reporter);
-      this.map = map;
       this.umbilical = umbilical;
       this.skipRecCounter = reporter.getCounter(TaskCounter.MAP_SKIPPED_RECORDS);
-      this.toWriteSkipRecs = map.toWriteSkipRecs() &&  
-        SkipBadRecords.getSkipOutputPath(map.conf)!=null;
-      skipIt = map.getSkipRanges().skipRangeIterator();
+      this.toWriteSkipRecs = toWriteSkipRecs() &&  
+        SkipBadRecords.getSkipOutputPath(conf)!=null;
+      skipIt = getSkipRanges().skipRangeIterator();
     }
     
     public synchronized boolean next(K key, V value)
@@ -288,7 +268,7 @@ class MapTask extends Task {
         skipWriter.close();
       }
       skipRecCounter.increment(skip);
-      map.reportNextRecordRange(umbilical, recIndex);
+      reportNextRecordRange(umbilical, recIndex);
       if (ret) {
         incrCounters();
       }
@@ -304,11 +284,11 @@ class MapTask extends Task {
     @SuppressWarnings("unchecked")
     private void writeSkippedRec(K key, V value) throws IOException{
       if(skipWriter==null) {
-        Path skipDir = SkipBadRecords.getSkipOutputPath(map.conf);
-        Path skipFile = new Path(skipDir, map.getTaskID().toString());
+        Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
+        Path skipFile = new Path(skipDir, getTaskID().toString());
         skipWriter = 
           SequenceFile.createWriter(
-              skipFile.getFileSystem(map.conf), map.conf, skipFile,
+              skipFile.getFileSystem(conf), conf, skipFile,
               (Class<K>) createKey().getClass(),
               (Class<V>) createValue().getClass(), 
               CompressionType.BLOCK, getTaskReporter());
@@ -322,9 +302,9 @@ class MapTask extends Task {
     throws IOException, ClassNotFoundException, InterruptedException {
     this.umbilical = umbilical;
 
-    if (isMapTask()) {   //GRR Q: why is this conditional here? ALWAYS true (unless someone derives from MapTask and overrides isMapTask() but not run())
+    if (isMapTask()) {
       mapPhase = getProgress().addPhase("map", 0.667f);
-      sortPhase = getProgress().addPhase("sort", 0.333f);
+      sortPhase  = getProgress().addPhase("sort", 0.333f);
     }
     TaskReporter reporter = startReporter(umbilical);
  
@@ -346,81 +326,79 @@ class MapTask extends Task {
     }
 
     if (useNewApi) {
-      runNewMapper(this, job, splitMetaInfo, umbilical, reporter);
+      runNewMapper(job, splitMetaInfo, umbilical, reporter);
     } else {
-      runOldMapper(this, job, splitMetaInfo, umbilical, reporter);
+      runOldMapper(job, splitMetaInfo, umbilical, reporter);
     }
     done(umbilical, reporter);
   }
 
-  @SuppressWarnings("unchecked")
-  private <T> T getSplitDetails(Path file, long offset) 
+ @SuppressWarnings("unchecked")
+ private <T> T getSplitDetails(Path file, long offset) 
   throws IOException {
-    FileSystem fs = file.getFileSystem(conf);
-    FSDataInputStream inFile = fs.open(file);
-    inFile.seek(offset);
-    String className = Text.readString(inFile);
-    Class<T> cls;
-    try {
-      cls = (Class<T>) conf.getClassByName(className);
-    } catch (ClassNotFoundException ce) {
-      IOException wrap = new IOException("Split class " + className +
-                                          " not found");
-      wrap.initCause(ce);
-      throw wrap;
-    }
-    SerializationFactory factory = new SerializationFactory(conf);
-    Deserializer<T> deserializer =
-      (Deserializer<T>) factory.getDeserializer(cls);
-    deserializer.open(inFile);
-    T split = deserializer.deserialize(null);
-    long pos = inFile.getPos();
-    getCounters().findCounter(
-        TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
-    inFile.close();
-    return split;
-  }
-
+   FileSystem fs = file.getFileSystem(conf);
+   FSDataInputStream inFile = fs.open(file);
+   inFile.seek(offset);
+   String className = Text.readString(inFile);
+   Class<T> cls;
+   try {
+     cls = (Class<T>) conf.getClassByName(className);
+   } catch (ClassNotFoundException ce) {
+     IOException wrap = new IOException("Split class " + className + 
+                                         " not found");
+     wrap.initCause(ce);
+     throw wrap;
+   }
+   SerializationFactory factory = new SerializationFactory(conf);
+   Deserializer<T> deserializer = 
+     (Deserializer<T>) factory.getDeserializer(cls);
+   deserializer.open(inFile);
+   T split = deserializer.deserialize(null);
+   long pos = inFile.getPos();
+   getCounters().findCounter(
+       TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
+   inFile.close();
+   return split;
+ }
+  
   @SuppressWarnings("unchecked")
-  static <INKEY,INVALUE,OUTKEY,OUTVALUE>
-  void runOldMapper(final MapTask map,
-                    final JobConf job,
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runOldMapper(final JobConf job,
                     final TaskSplitIndex splitIndex,
                     final TaskUmbilicalProtocol umbilical,
                     TaskReporter reporter
                     ) throws IOException, InterruptedException,
                              ClassNotFoundException {
-    InputSplit inputSplit =
-        map.getSplitDetails(new Path(splitIndex.getSplitLocation()),
-                            splitIndex.getStartOffset());
+    InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
+           splitIndex.getStartOffset());
 
-    map.updateJobWithSplit(job, inputSplit);
+    updateJobWithSplit(job, inputSplit);
     reporter.setInputSplit(inputSplit);
 
     RecordReader<INKEY,INVALUE> rawIn =                  // open input
       job.getInputFormat().getRecordReader(inputSplit, job, reporter);
-    RecordReader<INKEY,INVALUE> in = map.isSkipping() ? 
-        new SkippingRecordReader<INKEY,INVALUE>(map, rawIn, umbilical, reporter) :
+    RecordReader<INKEY,INVALUE> in = isSkipping() ? 
+        new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
         new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
-    job.setBoolean(JobContext.SKIP_RECORDS, map.isSkipping());
+    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
 
 
-    int numReduceTasks = map.conf.getNumReduceTasks();
+    int numReduceTasks = conf.getNumReduceTasks();
     LOG.info("numReduceTasks: " + numReduceTasks);
     MapOutputCollector collector = null;
     if (numReduceTasks > 0) {
-      collector = new MapOutputBuffer(map, umbilical, job, reporter);
+      collector = new MapOutputBuffer(umbilical, job, reporter);
     } else { 
-      collector = new DirectMapOutputCollector(map, umbilical, job, reporter);
+      collector = new DirectMapOutputCollector(umbilical, job, reporter);
     }
     MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
       ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
 
     try {
-      runner.run(in, new OldOutputCollector(collector, map.conf), reporter);
-      map.mapPhase.complete();
-      map.setPhase(TaskStatus.Phase.SORT);
-      map.statusUpdate(umbilical);
+      runner.run(in, new OldOutputCollector(collector, conf), reporter);
+      mapPhase.complete();
+      setPhase(TaskStatus.Phase.SORT);
+      statusUpdate(umbilical);
       collector.flush();
     } finally {
       //close
@@ -538,7 +516,7 @@ class MapTask extends Task {
     }
   }
 
-  private static class NewDirectOutputCollector<K,V>
+  private class NewDirectOutputCollector<K,V>
   extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
     private final org.apache.hadoop.mapreduce.RecordWriter out;
 
@@ -547,12 +525,12 @@ class MapTask extends Task {
     private final Counters.Counter mapOutputRecordCounter;
     
     @SuppressWarnings("unchecked")
-    NewDirectOutputCollector(MapTask map, MRJobConfig jobContext,
-        JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)
+    NewDirectOutputCollector(MRJobConfig jobContext,
+        JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) 
     throws IOException, ClassNotFoundException, InterruptedException {
       this.reporter = reporter;
-      out = map.outputFormat.getRecordWriter(map.taskContext);
-      mapOutputRecordCounter =
+      out = outputFormat.getRecordWriter(taskContext);
+      mapOutputRecordCounter = 
         reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
     }
 
@@ -575,22 +553,19 @@ class MapTask extends Task {
     }
   }
   
-  private static class NewOutputCollector<K,V>
+  private class NewOutputCollector<K,V>
     extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
-    private final MapTask map;
     private final MapOutputCollector<K,V> collector;
     private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
     private final int partitions;
 
     @SuppressWarnings("unchecked")
-    NewOutputCollector(MapTask map,
-                       org.apache.hadoop.mapreduce.JobContext jobContext,
+    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                        JobConf job,
                        TaskUmbilicalProtocol umbilical,
                        TaskReporter reporter
                        ) throws IOException, ClassNotFoundException {
-      this.map = map;
-      collector = new MapOutputBuffer<K,V>(map, umbilical, job, reporter);
+      collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
       partitions = jobContext.getNumReduceTasks();
       if (partitions > 1) {
         partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
@@ -624,19 +599,17 @@ class MapTask extends Task {
   }
 
   @SuppressWarnings("unchecked")
-  static <INKEY,INVALUE,OUTKEY,OUTVALUE>
-  void runNewMapper(final MapTask map,
-                    final JobConf job,
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runNewMapper(final JobConf job,
                     final TaskSplitIndex splitIndex,
                     final TaskUmbilicalProtocol umbilical,
                     TaskReporter reporter
                     ) throws IOException, ClassNotFoundException,
                              InterruptedException {
-    org.apache.hadoop.mapreduce.TaskAttemptID mapId = map.getTaskID();
-
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, mapId);
+      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
+                                                                  getTaskID());
     // make a mapper
     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
@@ -647,30 +620,31 @@ class MapTask extends Task {
         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
     // rebuild the input split
     org.apache.hadoop.mapreduce.InputSplit split = null;
-    split = map.getSplitDetails(new Path(splitIndex.getSplitLocation()),
+    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
         splitIndex.getStartOffset());
 
     job.set(TTConfig.TT_MAP_INPUT_SPLITINFO, split.toString());
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
       new NewTrackingRecordReader<INKEY,INVALUE>
           (inputFormat.createRecordReader(split, taskContext), reporter);
-
-    job.setBoolean(JobContext.SKIP_RECORDS, map.isSkipping());
+    
+    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
     org.apache.hadoop.mapreduce.RecordWriter output = null;
-
+    
     // get an output object
     if (job.getNumReduceTasks() == 0) {
       output = 
-        new NewDirectOutputCollector(map, taskContext, job, umbilical, reporter);
+        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
     } else {
-      output =
-        new NewOutputCollector(map, taskContext, job, umbilical, reporter);
+      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
     }
 
     org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
-        mapContext = 
-          new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, mapId,
-              input, output, map.committer, reporter, split);
+    mapContext = 
+      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
+          input, output, 
+          committer, 
+          reporter, split);
 
     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
         mapperContext = 
@@ -679,9 +653,9 @@ class MapTask extends Task {
 
     input.initialize(split, mapperContext);
     mapper.run(mapperContext);
-    map.mapPhase.complete();
-    map.setPhase(TaskStatus.Phase.SORT);
-    map.statusUpdate(umbilical);
+    mapPhase.complete();
+    setPhase(TaskStatus.Phase.SORT);
+    statusUpdate(umbilical);
     input.close();
     output.close(mapperContext);
   }
@@ -697,19 +671,20 @@ class MapTask extends Task {
         
   }
 
-  static class DirectMapOutputCollector<K, V>
+  class DirectMapOutputCollector<K, V>
     implements MapOutputCollector<K, V> {
-
+ 
     private RecordWriter<K, V> out = null;
+
     private TaskReporter reporter = null;
+
     private final Counters.Counter mapOutputRecordCounter;
 
     @SuppressWarnings("unchecked")
-    public DirectMapOutputCollector(MapTask map,
-        TaskUmbilicalProtocol umbilical,
+    public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
         JobConf job, TaskReporter reporter) throws IOException {
       this.reporter = reporter;
-      String finalName = getOutputName(map.getPartition());
+      String finalName = getOutputName(getPartition());
       FileSystem fs = FileSystem.get(job);
 
       out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
@@ -736,12 +711,8 @@ class MapTask extends Task {
     
   }
 
-  static class MapOutputBuffer<K extends Object, V extends Object>
+  private class MapOutputBuffer<K extends Object, V extends Object>
       implements MapOutputCollector<K, V>, IndexedSortable {
-    private final MapTask map;
-    private final MapOutputFile mapOutputFile;
-    private final Counters.Counter spilledRecordsCounter;
-
     final int partitions;
     final JobConf job;
     final TaskReporter reporter;
@@ -810,14 +781,9 @@ class MapTask extends Task {
     private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
 
     @SuppressWarnings("unchecked")
-    public MapOutputBuffer(MapTask map,
-                           TaskUmbilicalProtocol umbilical, JobConf job,
+    public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
                            TaskReporter reporter
                            ) throws IOException, ClassNotFoundException {
-      this.map = map;
-      mapOutputFile = map.mapOutputFile;
-      spilledRecordsCounter = map.spilledRecordsCounter;
-
       this.job = job;
       this.reporter = reporter;
       partitions = job.getNumReduceTasks();
@@ -916,8 +882,6 @@ class MapTask extends Task {
       }
     }
 
-    public TaskAttemptID getTaskID() { return map.getTaskID(); }
-
     /**
      * Serialize the key, value to intermediate storage.
      * When this method returns, kvindex must refer to sufficient unused
@@ -1414,7 +1378,7 @@ class MapTask extends Task {
         if (lspillException instanceof Error) {
           final String logMsg = "Task " + getTaskID() + " failed : " +
             StringUtils.stringifyException(lspillException);
-          map.reportFatalError(lspillException, logMsg);
+          reportFatalError(getTaskID(), lspillException, logMsg);
         }
         throw new IOException("Spill failed", lspillException);
       }
@@ -1731,7 +1695,7 @@ class MapTask extends Task {
         return;
       }
       {
-        map.sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
+        sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
         Merger.considerFinalMergeForProgress();
         
         IndexRecord rec = new IndexRecord();
@@ -1765,7 +1729,7 @@ class MapTask extends Task {
                          segmentList, mergeFactor,
                          new Path(mapId.toString()),
                          job.getOutputKeyComparator(), reporter, sortSegments,
-                         null, spilledRecordsCounter, map.sortPhase.phase());
+                         null, spilledRecordsCounter, sortPhase.phase());
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();
@@ -1782,7 +1746,7 @@ class MapTask extends Task {
           //close
           writer.close();
 
-          map.sortPhase.startNextPhase();
+          sortPhase.startNextPhase();
           
           // record offsets
           rec.startOffset = segmentStart;

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskStatus.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskStatus.java Tue Mar  8 05:54:02 2011
@@ -42,11 +42,6 @@ class MapTaskStatus extends TaskStatus {
     return true;
   }
 
-  @Override
-  public boolean getIsUber() {
-    return false;
-  }
-
   /**
    * Sets finishTime. 
    * @param finishTime finish time of task.



Mime
View raw message