hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1079191 [2/2] - in /hadoop/mapreduce/branches/yahoo-merge/src: contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ contrib/mumak/src/java/org/apache/hadoop/mapred/ java/ ja...
Date Tue, 08 Mar 2011 05:53:53 GMT
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Mar  8 05:53:52 2011
@@ -72,10 +72,8 @@ public class ReduceTask extends Task {
 
   private CompressionCodec codec;
 
-
   { 
     getProgress().setStatus("reduce"); 
-    setPhase(TaskStatus.Phase.SHUFFLE);        // phase to start with 
   }
 
   private Progress copyPhase;
@@ -121,14 +119,21 @@ public class ReduceTask extends Task {
 
   public ReduceTask() {
     super();
+    this.taskStatus = new ReduceTaskStatus();
   }
 
   public ReduceTask(String jobFile, TaskAttemptID taskId,
                     int partition, int numMaps, int numSlotsRequired) {
     super(jobFile, taskId, partition, numSlotsRequired);
     this.numMaps = numMaps;
+/*
+ */
+    this.taskStatus = new ReduceTaskStatus(getTaskID(), 0.0f, numSlotsRequired,
+                                           TaskStatus.State.UNASSIGNED,
+                                           "", "", "", TaskStatus.Phase.SHUFFLE,
+                                           getCounters());
   }
-  
+
   private CompressionCodec initCodec() {
     // check if map-outputs are to be compressed
     if (conf.getCompressMapOutput()) {
@@ -151,6 +156,45 @@ public class ReduceTask extends Task {
     return false;
   }
 
+  /**
+   * Is this really a combo-task masquerading as a plain MapTask?  Decidedly
+   * not.
+   */
+  @Override
+  public boolean isUberTask() {
+    return false;
+  }
+
+  /**
+   * Allow UberTask (or, potentially, JobInProgress or others) to set up a
+   * deeper Progress hierarchy even if run() is skipped.  If setProgress()
+   * is also needed, it should be called <I>before</I> createPhase() or else
+   * the sub-phases created here will be wiped out.
+   */
+  void createPhase(TaskStatus.Phase phaseType, String status) {
+    if (phaseType == TaskStatus.Phase.SHUFFLE) {
+      copyPhase = getProgress().addPhase(status);
+    } else if (phaseType == TaskStatus.Phase.SORT) {
+      sortPhase = getProgress().addPhase(status);
+    } else /* TaskStatus.Phase.REDUCE */ {
+      reducePhase = getProgress().addPhase(status);
+    }
+  }
+
+  /**
+   * Allow UberTask to traverse the deeper Progress hierarchy in case run() is
+   * skipped.
+   */
+  void completePhase(TaskStatus.Phase phaseType) {
+    if (phaseType == TaskStatus.Phase.SHUFFLE) {
+      copyPhase.complete();
+    } else if (phaseType == TaskStatus.Phase.SORT) {
+      sortPhase.complete();
+    } else /* TaskStatus.Phase.REDUCE */ {
+      reducePhase.complete();
+    }
+  }
+
   public int getNumMaps() { return numMaps; }
   
   /**
@@ -177,37 +221,39 @@ public class ReduceTask extends Task {
   }
   
   // Get the input files for the reducer.
-  private Path[] getMapFiles(FileSystem fs, boolean isLocal) 
+  static Path[] getMapFiles(ReduceTask reduce, FileSystem fs, boolean isLocal) 
   throws IOException {
     List<Path> fileList = new ArrayList<Path>();
     if (isLocal) {
       // for local jobs
-      for(int i = 0; i < numMaps; ++i) {
-        fileList.add(mapOutputFile.getInputFile(i));
+      for (int i = 0; i < reduce.numMaps; ++i) {
+        fileList.add(reduce.mapOutputFile.getInputFile(i));
       }
     } else {
       // for non local jobs
-      for (FileStatus filestatus : mapOutputFilesOnDisk) {
+      for (FileStatus filestatus : reduce.mapOutputFilesOnDisk) {
         fileList.add(filestatus.getPath());
       }
     }
     return fileList.toArray(new Path[0]);
   }
 
-  private class ReduceValuesIterator<KEY,VALUE> 
+  private static class ReduceValuesIterator<KEY,VALUE> 
           extends ValuesIterator<KEY,VALUE> {
-    public ReduceValuesIterator (RawKeyValueIterator in,
+    ReduceTask reduce;
+    public ReduceValuesIterator (ReduceTask reduce, RawKeyValueIterator in,
                                  RawComparator<KEY> comparator, 
                                  Class<KEY> keyClass,
                                  Class<VALUE> valClass,
                                  Configuration conf, Progressable reporter)
       throws IOException {
       super(in, comparator, keyClass, valClass, conf, reporter);
+      this.reduce = reduce;
     }
 
     @Override
     public VALUE next() {
-      reduceInputValueCounter.increment(1);
+      reduce.reduceInputValueCounter.increment(1);
       return moveToNext();
     }
     
@@ -216,12 +262,13 @@ public class ReduceTask extends Task {
     }
     
     public void informReduceProgress() {
-      reducePhase.set(super.in.getProgress().getProgress()); // update progress
+      // update progress:
+      reduce.reducePhase.set(super.in.getProgress().getProgress());
       reporter.progress();
     }
   }
 
-  private class SkippingReduceValuesIterator<KEY,VALUE> 
+  private static class SkippingReduceValuesIterator<KEY,VALUE> 
      extends ReduceValuesIterator<KEY,VALUE> {
      private SkipRangeIterator skipIt;
      private TaskUmbilicalProtocol umbilical;
@@ -234,26 +281,27 @@ public class ReduceTask extends Task {
      private boolean toWriteSkipRecs;
      private boolean hasNext;
      private TaskReporter reporter;
-     
-     public SkippingReduceValuesIterator(RawKeyValueIterator in,
+
+     public SkippingReduceValuesIterator(ReduceTask reduce,
+         RawKeyValueIterator in,
          RawComparator<KEY> comparator, Class<KEY> keyClass,
          Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
          TaskUmbilicalProtocol umbilical) throws IOException {
-       super(in, comparator, keyClass, valClass, conf, reporter);
+       super(reduce, in, comparator, keyClass, valClass, conf, reporter);
        this.umbilical = umbilical;
-       this.skipGroupCounter = 
+       this.skipGroupCounter =
          reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
-       this.skipRecCounter = 
+       this.skipRecCounter =
          reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
-       this.toWriteSkipRecs = toWriteSkipRecs() &&  
+       this.toWriteSkipRecs = reduce.toWriteSkipRecs() &&
          SkipBadRecords.getSkipOutputPath(conf)!=null;
        this.keyClass = keyClass;
        this.valClass = valClass;
        this.reporter = reporter;
-       skipIt = getSkipRanges().skipRangeIterator();
+       skipIt = reduce.getSkipRanges().skipRangeIterator();
        mayBeSkip();
      }
-     
+
      public void nextKey() throws IOException {
        super.nextKey();
        mayBeSkip();
@@ -292,16 +340,16 @@ public class ReduceTask extends Task {
        }
        skipGroupCounter.increment(skip);
        skipRecCounter.increment(skipRec);
-       reportNextRecordRange(umbilical, grpIndex);
+       reduce.reportNextRecordRange(umbilical, grpIndex);
      }
      
      @SuppressWarnings("unchecked")
      private void writeSkippedRec(KEY key, VALUE value) throws IOException{
        if(skipWriter==null) {
-         Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
-         Path skipFile = new Path(skipDir, getTaskID().toString());
+         Path skipDir = SkipBadRecords.getSkipOutputPath(reduce.conf);
+         Path skipFile = new Path(skipDir, reduce.getTaskID().toString());
          skipWriter = SequenceFile.createWriter(
-               skipFile.getFileSystem(conf), conf, skipFile,
+               skipFile.getFileSystem(reduce.conf), reduce.conf, skipFile,
                keyClass, valClass, 
                CompressionType.BLOCK, reporter);
        }
@@ -363,8 +411,8 @@ public class ReduceTask extends Task {
     } else {
       final FileSystem rfs = FileSystem.getLocal(job).getRaw();
       rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
-                           job.getMapOutputValueClass(), codec, 
-                           getMapFiles(rfs, true),
+                           job.getMapOutputValueClass(), codec,
+                           getMapFiles(this, rfs, true),
                            !conf.getKeepFailedTaskFiles(), 
                            job.getInt(JobContext.IO_SORT_FACTOR, 100),
                            new Path(getTaskID().toString()), 
@@ -382,18 +430,40 @@ public class ReduceTask extends Task {
     RawComparator comparator = job.getOutputValueGroupingComparator();
 
     if (useNewApi) {
-      runNewReducer(job, umbilical, reporter, rIter, comparator, 
+      runNewReducer(this, job, umbilical, reporter, rIter, comparator,
                     keyClass, valueClass);
     } else {
-      runOldReducer(job, umbilical, reporter, rIter, comparator, 
+      runOldReducer(this, job, umbilical, reporter, rIter, comparator,
                     keyClass, valueClass);
     }
     done(umbilical, reporter);
   }
 
+  private static class WrappedOutputCollector<OUTKEY, OUTVALUE>
+  implements OutputCollector<OUTKEY, OUTVALUE> {
+    RecordWriter<OUTKEY, OUTVALUE> out;
+    TaskReporter reporter;
+    Counters.Counter reduceOutputCounter;
+    public WrappedOutputCollector(ReduceTask reduce,
+                                  RecordWriter<OUTKEY, OUTVALUE> out,
+                                  TaskReporter reporter) {
+      this.out = out;
+      this.reporter = reporter;
+      this.reduceOutputCounter = reduce.reduceOutputCounter;
+    }
+
+    public void collect(OUTKEY key, OUTVALUE value)
+    throws IOException {
+      out.write(key, value);
+      reduceOutputCounter.increment(1);
+      // indicate that progress update needs to be sent
+      reporter.progress();
+    }
+  }
+
   @SuppressWarnings("unchecked")
-  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
-  void runOldReducer(JobConf job,
+  static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runOldReducer(ReduceTask reduce, JobConf job,
                      TaskUmbilicalProtocol umbilical,
                      final TaskReporter reporter,
                      RawKeyValueIterator rIter,
@@ -403,7 +473,7 @@ public class ReduceTask extends Task {
     Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = 
       ReflectionUtils.newInstance(job.getReducerClass(), job);
     // make output collector
-    String finalName = getOutputName(getPartition());
+    String finalName = getOutputName(reduce.getPartition());
 
     FileSystem fs = FileSystem.get(job);
 
@@ -411,15 +481,7 @@ public class ReduceTask extends Task {
       job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);  
     
     OutputCollector<OUTKEY,OUTVALUE> collector = 
-      new OutputCollector<OUTKEY,OUTVALUE>() {
-        public void collect(OUTKEY key, OUTVALUE value)
-          throws IOException {
-          out.write(key, value);
-          reduceOutputCounter.increment(1);
-          // indicate that progress update needs to be sent
-          reporter.progress();
-        }
-      };
+      new WrappedOutputCollector<OUTKEY, OUTVALUE>(reduce, out, reporter);
     
     // apply reduce function
     try {
@@ -427,16 +489,16 @@ public class ReduceTask extends Task {
       boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
         SkipBadRecords.getAutoIncrReducerProcCount(job);
       
-      ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ? 
-          new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter, 
+      ReduceValuesIterator<INKEY,INVALUE> values = reduce.isSkipping() ?
+          new SkippingReduceValuesIterator<INKEY,INVALUE>(reduce, rIter,
               comparator, keyClass, valueClass, 
               job, reporter, umbilical) :
-          new ReduceValuesIterator<INKEY,INVALUE>(rIter, 
+          new ReduceValuesIterator<INKEY,INVALUE>(reduce, rIter,
           job.getOutputValueGroupingComparator(), keyClass, valueClass, 
           job, reporter);
       values.informReduceProgress();
       while (values.more()) {
-        reduceInputKeyCounter.increment(1);
+        reduce.reduceInputKeyCounter.increment(1);
         reducer.reduce(values.getKey(), values, collector, reporter);
         if(incrProcCount) {
           reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
@@ -449,7 +511,7 @@ public class ReduceTask extends Task {
       //Clean up: repeated in catch block below
       reducer.close();
       out.close(reporter);
-      //End of clean up.
+      //End of cleanup.
     } catch (IOException ioe) {
       try {
         reducer.close();
@@ -487,9 +549,38 @@ public class ReduceTask extends Task {
     }
   }
 
+  private static class WrappedRawKeyValueIterator implements RawKeyValueIterator {
+    ReduceTask reduce;
+    TaskReporter reporter;
+    RawKeyValueIterator rawIter;
+    public WrappedRawKeyValueIterator(ReduceTask reduce, TaskReporter reporter,
+                          RawKeyValueIterator rawIter) {
+      this.reduce = reduce;
+      this.rawIter = rawIter;
+      this.reporter = reporter;
+    }
+    public void close() throws IOException {
+      rawIter.close();
+    }
+    public DataInputBuffer getKey() throws IOException {
+      return rawIter.getKey();
+    }
+    public Progress getProgress() {
+      return rawIter.getProgress();
+    }
+    public DataInputBuffer getValue() throws IOException {
+      return rawIter.getValue();
+    }
+    public boolean next() throws IOException {
+      boolean ret = rawIter.next();
+      reporter.setProgress(rawIter.getProgress().getProgress());
+      return ret;
+    }
+  }
+
   @SuppressWarnings("unchecked")
-  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
-  void runNewReducer(JobConf job,
+  static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runNewReducer(final ReduceTask reduce, JobConf job,
                      final TaskUmbilicalProtocol umbilical,
                      final TaskReporter reporter,
                      RawKeyValueIterator rIter,
@@ -498,49 +589,29 @@ public class ReduceTask extends Task {
                      Class<INVALUE> valueClass
                      ) throws IOException,InterruptedException, 
                               ClassNotFoundException {
+    org.apache.hadoop.mapreduce.TaskAttemptID reduceId = reduce.getTaskID();
     // wrap value iterator to report progress.
     final RawKeyValueIterator rawIter = rIter;
-    rIter = new RawKeyValueIterator() {
-      public void close() throws IOException {
-        rawIter.close();
-      }
-      public DataInputBuffer getKey() throws IOException {
-        return rawIter.getKey();
-      }
-      public Progress getProgress() {
-        return rawIter.getProgress();
-      }
-      public DataInputBuffer getValue() throws IOException {
-        return rawIter.getValue();
-      }
-      public boolean next() throws IOException {
-        boolean ret = rawIter.next();
-        reporter.setProgress(rawIter.getProgress().getProgress());
-        return ret;
-      }
-    };
+    rIter = new WrappedRawKeyValueIterator(reduce, reporter, rawIter);
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID());
+      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, reduceId);
     // make a reducer
     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
       (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
         ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
       (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
-        outputFormat.getRecordWriter(taskContext);
+        reduce.outputFormat.getRecordWriter(taskContext);
     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
-      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter);
-    job.setBoolean("mapred.skip.on", isSkipping());
-    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
-    org.apache.hadoop.mapreduce.Reducer.Context 
-         reducerContext = createReduceContext(reducer, job, getTaskID(),
-                                               rIter, reduceInputKeyCounter, 
-                                               reduceInputValueCounter, 
-                                               trackedRW,
-                                               committer,
-                                               reporter, comparator, keyClass,
-                                               valueClass);
+      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduce.reduceOutputCounter);
+    job.setBoolean(JobContext.SKIP_RECORDS, reduce.isSkipping());
+    org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
+      createReduceContext(reducer, job, reduceId, rIter,
+                          reduce.reduceInputKeyCounter, 
+                          reduce.reduceInputValueCounter, 
+                          trackedRW, reduce.committer, reporter,
+                          comparator, keyClass, valueClass);
     reducer.run(reducerContext);
     output.close(reducerContext);
   }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Tue Mar  8 05:53:52 2011
@@ -54,6 +54,11 @@ class ReduceTaskStatus extends TaskStatu
   }
 
   @Override
+  public boolean getIsUber() {
+    return false;
+  }
+
+  @Override
   void setFinishTime(long finishTime) {
     if (shuffleFinishTime == 0) {
       this.shuffleFinishTime = finishTime; 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar  8 05:53:52 2011
@@ -118,6 +118,7 @@ abstract public class Task implements Wr
   private String jobFile;                         // job configuration file
   private String user;                            // user running the job
   private TaskAttemptID taskId;                   // unique, includes job id
+  private TaskAttemptID taskIdForUmbilical;       // same, or uber's if subtask
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
   protected JobStatus.State jobRunStateForCleanup;
@@ -159,8 +160,8 @@ abstract public class Task implements Wr
   ////////////////////////////////////////////
 
   public Task() {
-    taskStatus = TaskStatus.createTaskStatus(isMapTask());
     taskId = new TaskAttemptID();
+    taskIdForUmbilical = taskId;
     spilledRecordsCounter = 
       counters.findCounter(TaskCounter.SPILLED_RECORDS);
     failedShuffleCounter = 
@@ -174,17 +175,10 @@ abstract public class Task implements Wr
               int numSlotsRequired) {
     this.jobFile = jobFile;
     this.taskId = taskId;
+    this.taskIdForUmbilical = taskId;
      
     this.partition = partition;
     this.numSlotsRequired = numSlotsRequired;
-    this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId, 
-                                                  0.0f, numSlotsRequired, 
-                                                  TaskStatus.State.UNASSIGNED, 
-                                                  "", "", "", 
-                                                  isMapTask() ? 
-                                                    TaskStatus.Phase.MAP : 
-                                                    TaskStatus.Phase.SHUFFLE, 
-                                                  counters);
     spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
     failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE);
     mergedMapOutputsCounter = 
@@ -197,7 +191,12 @@ abstract public class Task implements Wr
   ////////////////////////////////////////////
   public void setJobFile(String jobFile) { this.jobFile = jobFile; }
   public String getJobFile() { return jobFile; }
+
   public TaskAttemptID getTaskID() { return taskId; }
+  public void setTaskIdForUmbilical(TaskAttemptID taskIdForUmbilical) {
+    this.taskIdForUmbilical = taskIdForUmbilical;
+  }
+
   public int getNumSlotsRequired() {
     return numSlotsRequired;
   }
@@ -269,7 +268,7 @@ abstract public class Task implements Wr
   /**
    * Report a fatal error to the parent (task) tracker.
    */
-  protected void reportFatalError(TaskAttemptID id, Throwable throwable, 
+  protected void reportFatalError(Throwable throwable,
                                   String logMsg) {
     LOG.fatal(logMsg);
     Throwable tCause = throwable.getCause();
@@ -277,7 +276,7 @@ abstract public class Task implements Wr
                    ? StringUtils.stringifyException(throwable)
                    : StringUtils.stringifyException(tCause);
     try {
-      umbilical.fatalError(id, cause);
+      umbilical.fatalError(taskIdForUmbilical, cause);
     } catch (IOException ioe) {
       LOG.fatal("Failed to contact the tasktracker", ioe);
       System.exit(-1);
@@ -389,6 +388,14 @@ abstract public class Task implements Wr
     this.user = user;
   }
 
+  /**
+   * Return the task's MapOutputFile instance.
+   * @return the task's MapOutputFile instance
+   */
+  MapOutputFile getMapOutputFile() {
+    return mapOutputFile;
+  }
+
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -396,6 +403,7 @@ abstract public class Task implements Wr
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, jobFile);
     taskId.write(out);
+    taskIdForUmbilical.write(out);
     out.writeInt(partition);
     out.writeInt(numSlotsRequired);
     taskStatus.write(out);
@@ -414,6 +422,7 @@ abstract public class Task implements Wr
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
     taskId = TaskAttemptID.read(in);
+    taskIdForUmbilical = TaskAttemptID.read(in);
     partition = in.readInt();
     numSlotsRequired = in.readInt();
     taskStatus.readFields(in);
@@ -465,7 +474,7 @@ abstract public class Task implements Wr
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 3000;
 
-  private transient Progress taskProgress = new Progress();
+  private transient Progress taskProgress = new Progress(); //GRR Q:  why transient?  lose entire tree every time serialize??
 
   // Current counters
   private transient Counters counters = new Counters();
@@ -475,6 +484,21 @@ abstract public class Task implements Wr
   
   public abstract boolean isMapTask();
 
+  /**
+   * Is this really a combo-task masquerading as a plain MapTask?
+   */
+  public abstract boolean isUberTask();
+
+  /**
+   * This setter allows one to incorporate Tasks into multiple levels of
+   * a Progress addPhase()-generated hierarchy (i.e., not always the root
+   * node), which in turn allows Progress to handle all details of progress
+   * aggregation for an UberTask or even a whole job.
+   */
+  protected void setProgress(Progress progress) {
+    taskProgress = progress;
+  }
+
   public Progress getProgress() { return taskProgress; }
 
   public void initialize(JobConf job, JobID id, 
@@ -518,7 +542,7 @@ abstract public class Task implements Wr
         resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
     }
   }
-  
+
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   protected class TaskReporter 
@@ -561,6 +585,7 @@ abstract public class Task implements Wr
       // indicate that progress update needs to be sent
       setProgressFlag();
     }
+    // FIXME?  why isn't this deprecated in favor of public setProgressFlag()?
     public void progress() {
       // indicate that progress update needs to be sent
       setProgressFlag();
@@ -603,15 +628,16 @@ abstract public class Task implements Wr
     }
     public InputSplit getInputSplit() throws UnsupportedOperationException {
       if (split == null) {
-        throw new UnsupportedOperationException("Input only available on map");
+        throw new UnsupportedOperationException("Input available only on map");
       } else {
         return split;
       }
     }  
     /** 
-     * The communication thread handles communication with the parent (Task Tracker). 
-     * It sends progress updates if progress has been made or if the task needs to 
-     * let the parent know that it's alive. It also pings the parent to see if it's alive. 
+     * The communication thread handles communication with the parent
+     * (TaskTracker). It sends progress updates if progress has been made or
+     * if the task needs to let the parent know that it's alive. It also pings
+     * the parent to see if it's alive. 
      */
     public void run() {
       final int MAX_RETRIES = 3;
@@ -647,8 +673,8 @@ abstract public class Task implements Wr
             taskFound = umbilical.ping(taskId);
           }
 
-          // if Task Tracker is not aware of our task ID (probably because it died and 
-          // came back up), kill ourselves
+          // if TaskTracker is not aware of our task ID (probably because it
+          // died and came back up), kill ourselves
           if (!taskFound) {
             LOG.warn("Parent died.  Exiting "+taskId);
             System.exit(66);
@@ -682,7 +708,7 @@ abstract public class Task implements Wr
       }
     }
   }
-  
+
   /**
    *  Reports the next executing record range to TaskTracker.
    *  
@@ -701,7 +727,7 @@ abstract public class Task implements Wr
     if (LOG.isDebugEnabled()) {
       LOG.debug("sending reportNextRecordRange " + range);
     }
-    umbilical.reportNextRecordRange(taskId, range);
+    umbilical.reportNextRecordRange(taskIdForUmbilical, range);
   }
 
   /**
@@ -837,8 +863,13 @@ abstract public class Task implements Wr
   public void done(TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, InterruptedException {
-    LOG.info("Task:" + taskId + " is done."
-             + " And is in the process of commiting");
+    if (isUberTask()) {
+      LOG.info("UberTask:" + taskIdForUmbilical + " subtask:" + taskId
+               + "is done and is in the process of committing.");
+    } else {
+      LOG.info("Task:" + taskId
+               + "is done and is in the process of committing.");
+    }
     updateCounters();
 
     boolean commitRequired = isCommitRequired();
@@ -848,7 +879,7 @@ abstract public class Task implements Wr
       // say the task tracker that task is commit pending
       while (true) {
         try {
-          umbilical.commitPending(taskId, taskStatus);
+          umbilical.commitPending(taskIdForUmbilical, taskStatus);
           break;
         } catch (InterruptedException ie) {
           // ignore
@@ -899,8 +930,14 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
-          LOG.warn("Parent died.  Exiting "+taskId);
+        //GRR FIXME (later):  alternatives to taskIdForUmbilical would be
+        // (1) include taskId as part of umbilical object and protocol;
+        // (2) include taskId as part of taskStatus
+        // (3) extend TaskAttemptID (or create related Task inner class?) to
+        //     include taskAttemptId() and taskAtteptIdForUmbilical() method
+        //     that's overridden in uber context [Dick]
+        if (!umbilical.statusUpdate(taskIdForUmbilical, taskStatus)) {
+          LOG.warn("Parent died.  Exiting " + taskId);
           System.exit(66);
         }
         taskStatus.clearStatus();
@@ -936,7 +973,7 @@ abstract public class Task implements Wr
    * @return -1 if it can't be found.
    */
    private long calculateOutputSize() throws IOException {
-    if (!isMapOrReduce()) {
+    if (!isMapOrReduce() || isUberTask()) {
       return -1;
     }
 
@@ -956,8 +993,13 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        umbilical.done(getTaskID());
-        LOG.info("Task '" + taskId + "' done.");
+        umbilical.done(taskIdForUmbilical);
+        if (isUberTask()) {
+          LOG.info("UberTask '" + taskIdForUmbilical + "' subtask '" + taskId
+                   + "' done.");
+        } else {
+          LOG.info("Task '" + taskId + "' done.");
+        }
         return;
       } catch (IOException ie) {
         LOG.warn("Failure signalling completion: " + 
@@ -976,7 +1018,7 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        while (!umbilical.canCommit(taskId)) {
+        while (!umbilical.canCommit(taskIdForUmbilical)) {
           try {
             Thread.sleep(1000);
           } catch(InterruptedException ie) {
@@ -1033,7 +1075,7 @@ abstract public class Task implements Wr
     setPhase(TaskStatus.Phase.CLEANUP);
     getProgress().setStatus("cleanup");
     statusUpdate(umbilical);
-    LOG.info("Runnning cleanup for the task");
+    LOG.info("Running cleanup for the task");
     // do the cleanup
     committer.abortTask(taskContext);
   }
@@ -1058,8 +1100,10 @@ abstract public class Task implements Wr
         oldCommitter.abortJob(jobContext, jobRunStateForCleanup);
       }
     } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
-      LOG.info("Committing job");
-      committer.commitJob(jobContext);
+      // delete <outputdir>/_temporary and optionally create _SUCCESS file
+      if (!isUberTask()) {  // defer since output files have not yet been saved
+        commitJob();
+      }
     } else {
       throw new IOException("Invalid state of the job for cleanup. State found "
                             + jobRunStateForCleanup + " expecting "
@@ -1068,20 +1112,31 @@ abstract public class Task implements Wr
                             + JobStatus.State.KILLED);
     }
     
-    // delete the staging area for the job
+    // delete the staging area for the job (e.g.,
+    // "hdfs://localhost:9000/tmp/hadoop-<user>/mapred/staging/<user>/.staging/
+    // job_YYYYMMDDhhmm_nnnn"--NOT same as "_temporary" subdir of output dir)
     JobConf conf = new JobConf(jobContext.getConfiguration());
     if (!supportIsolationRunner(conf)) {
-      String jobTempDir = conf.get("mapreduce.job.dir");
-      Path jobTempDirPath = new Path(jobTempDir);
-      FileSystem fs = jobTempDirPath.getFileSystem(conf);
-      fs.delete(jobTempDirPath, true);
+      String jobStagingDir = conf.get("mapreduce.job.dir");
+      Path jobStagingDirPath = new Path(jobStagingDir);
+      FileSystem fs = jobStagingDirPath.getFileSystem(conf);
+      fs.delete(jobStagingDirPath, true);
+    }
+    // update counters, save any pending output files, shut down the progress-
+    // reporter communication thread and the umbilical, and mark the task done
+    if (!isUberTask()) {  // defer so UberTask can send TT final update(s)
+      done(umbilical, reporter);
     }
-    done(umbilical, reporter);
   }
-  
+
+  protected void commitJob() throws IOException {
+    LOG.info("Committing job");
+    committer.commitJob(jobContext);
+  }
+
   protected boolean supportIsolationRunner(JobConf conf) {
-    return (conf.getKeepTaskFilesPattern() != null || conf
-        .getKeepFailedTaskFiles());
+    return (conf.getKeepTaskFilesPattern() != null ||
+            conf.getKeepFailedTaskFiles());
   }
 
   protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
@@ -1090,9 +1145,12 @@ abstract public class Task implements Wr
     // do the setup
     getProgress().setStatus("setup");
     committer.setupJob(jobContext);
-    done(umbilical, reporter);
+    if (!isUberTask()) {
+      // UberTask calls done() directly; don't shut down umbilical prematurely
+      done(umbilical, reporter);
+    }
   }
-  
+
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
       this.conf = (JobConf) conf;

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Mar  8 05:53:52 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 import org.apache.hadoop.net.Node;
@@ -62,7 +62,7 @@ import org.apache.hadoop.net.Node;
  * **************************************************************
  */
 class TaskInProgress {
-  static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently  
+  static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently
   int maxTaskAttempts = 4;    
   static final long SPECULATIVE_LAG = 60 * 1000;
   private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
@@ -75,14 +75,19 @@ class TaskInProgress {
 
   // Defines the TIP
   private String jobFile = null;
-  private TaskSplitMetaInfo splitInfo;
+  private TaskSplitMetaInfo[] splitInfo;
   private int numMaps;
+  private int numReduces;
   private int partition;
   private JobTracker jobtracker;
   private JobHistory jobHistory;
   private TaskID id;
   private JobInProgress job;
   private final int numSlotsRequired;
+  private boolean jobCleanup = false;
+  private boolean jobSetup = false;
+  private boolean isUber = false;
+  private boolean jobSetupCleanupNeeded = false;  // UberTasks only
 
   // Status of the TIP
   private int successEventNumber = -1;
@@ -100,8 +105,6 @@ class TaskInProgress {
   private long maxSkipRecords = 0;
   private FailedRanges failedRanges = new FailedRanges();
   private volatile boolean skipping = false;
-  private boolean jobCleanup = false; 
-  private boolean jobSetup = false;
 
   private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
   private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
@@ -169,7 +172,8 @@ class TaskInProgress {
                         JobInProgress job, int partition,
                         int numSlotsRequired) {
     this.jobFile = jobFile;
-    this.splitInfo = split;
+    this.splitInfo = new TaskSplitMetaInfo[1];
+    this.splitInfo[0] = split;
     this.jobtracker = jobtracker;
     this.job = job;
     this.conf = conf;
@@ -183,7 +187,7 @@ class TaskInProgress {
     }
     this.user = job.getUser();
   }
-        
+
   /**
    * Constructor for ReduceTask
    */
@@ -265,7 +269,36 @@ class TaskInProgress {
       }
     }
   }
-  
+
+  /**
+   * Constructor for UberTask
+   */
+  public TaskInProgress(JobID jobid, String jobFile,
+                        TaskSplitMetaInfo[] splits,
+                        int numMaps, int numReduces,
+                        int partition, JobTracker jobtracker, JobConf conf,
+                        JobInProgress job, int numSlotsRequired,
+                        boolean jobSetupCleanupNeeded) {
+    this.isUber = true;
+    this.jobFile = jobFile;
+    this.splitInfo = splits;
+    this.numMaps = numMaps;
+    this.numReduces = numReduces;
+    this.jobSetupCleanupNeeded = jobSetupCleanupNeeded;
+    this.jobtracker = jobtracker;
+    this.job = job;
+    this.conf = conf;
+    this.partition = partition;
+    this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
+    this.numSlotsRequired = numSlotsRequired;
+    setMaxTaskAttempts();
+    init(jobid);
+    if (jobtracker != null) {
+      this.jobHistory = jobtracker.getJobHistory();
+    }
+    this.user = job.getUser();
+  }
+
   /**
    * Set the max number of attempts before we declare a TIP as "failed"
    */
@@ -276,15 +309,6 @@ class TaskInProgress {
       this.maxTaskAttempts = conf.getMaxReduceAttempts();
     }
   }
-    
-  /**
-   * Return the index of the tip within the job, so 
-   * "task_200707121733_1313_0002_m_012345" would return 12345;
-   * @return int the tip index
-   */
-  public int idWithinJob() {
-    return partition;
-  }    
 
   public boolean isJobCleanupTask() {
    return jobCleanup;
@@ -392,19 +416,32 @@ class TaskInProgress {
   public JobInProgress getJob() {
     return job;
   }
+
   /**
    * Return an ID for this task, not its component taskid-threads
    */
   public TaskID getTIPId() {
     return this.id;
   }
+
   /**
-   * Whether this is a map task
+   * Whether this is a map task.  Note that ubertasks return false here so
+   * they can run in a reduce slot (larger).  (Setup and cleanup tasks may
+   * return either true or false.)
    */
   public boolean isMapTask() {
-    return splitInfo != null;
+    return splitInfo != null && !isUber;
   }
-    
+
+  /**
+   * Whether this is an ubertask, i.e., a meta-task that contains a handful
+   * of map tasks and (at most) a single reduce task.  Note that ubertasks
+   * are seen as reduce tasks in most contexts.
+   */
+  public boolean isUberTask() {
+    return isUber;
+  }
+
   /**
    * Returns the {@link TaskType} of the {@link TaskAttemptID} passed. 
    * The type of an attempt is determined by the nature of the task and not its 
@@ -929,15 +966,16 @@ class TaskInProgress {
   }
 
   /**
-   * Get the split locations 
+   * Get the split locations
    */
   public String[] getSplitLocations() {
+//GRR FIXME?  may need to add "(  ..  || isUberTask())" if ever called for uber (but locations for which split?  all of them?)
     if (isMapTask() && !jobSetup && !jobCleanup) {
-      return splitInfo.getLocations();
+      return splitInfo[0].getLocations();
     }
     return new String[0];
   }
-  
+
   /**
    * Get the Status of the tasks managed by this TIP
    */
@@ -1120,7 +1158,7 @@ class TaskInProgress {
  
     //set this the first time we run a taskAttempt in this TIP
     //each Task attempt has its own TaskStatus, which tracks that
-    //attempts execStartTime, thus this startTime is TIP wide.
+    //attempt's execStartTime, thus this startTime is TIP wide.
     if (0 == execStartTime){
       setExecStartTime(lastDispatchTime);
     }
@@ -1132,8 +1170,9 @@ class TaskInProgress {
   }
   
   /**
-   * Adds a previously running task to this tip. This is used in case of 
-   * jobtracker restarts.
+   * Creates a task or recreates a previously running one and adds it to this
+   * tip. The latter is used in case of jobtracker restarts.  This is the
+   * ultimate source of Task objects in a normal Hadoop setup.
    */
   public Task addRunningTask(TaskAttemptID taskid, 
                              String taskTracker,
@@ -1147,10 +1186,20 @@ class TaskInProgress {
         LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
                   + failedRanges.getIndicesCount());
       }
-      t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
-          numSlotsNeeded);
+      t = new MapTask(jobFile, taskid, partition,
+                      splitInfo[0].getSplitIndex(), numSlotsNeeded);
     } else {
-      t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
+      if (isUberTask()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Launching actual UberTask (" + numMaps + " maps, "
+                    + numReduces + " reduces)");
+        }
+        // numMaps is implicit in size of splitIndex array:
+        t = new UberTask(jobFile, taskid, partition, getSplitIndexArray(),
+                         numReduces, numSlotsNeeded, jobSetupCleanupNeeded);
+      } else {
+        t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
+      }
     }
     if (jobCleanup) {
       t.setJobCleanupTask();
@@ -1187,6 +1236,17 @@ class TaskInProgress {
     return t;
   }
 
+  // GRR FIXME?  more efficient just to pass splitInfo directly...any need for
+  //             rest of it in UberTask?
+  TaskSplitIndex[] getSplitIndexArray() {
+    int numSplits = splitInfo.length;
+    TaskSplitIndex[] splitIndex = new TaskSplitIndex[numSplits];
+    for (int i = 0; i < numSplits; ++i) {
+      splitIndex[i] = splitInfo[i].getSplitIndex();
+    }
+    return splitIndex;
+  }
+
   boolean isRunningTask(TaskAttemptID taskid) {
     TaskStatus status = taskStatuses.get(taskid);
     return status != null && status.getRunState() == TaskStatus.State.RUNNING;
@@ -1236,7 +1296,8 @@ class TaskInProgress {
   }
     
   /**
-   * Get the id of this map or reduce task.
+   * Get the task index of this map or reduce task.  For example,
+   * "task_201011230308_87259_r_000240" would return 240.
    * @return The index of this tip in the maps/reduces lists.
    */
   public int getIdWithinJob() {
@@ -1256,7 +1317,7 @@ class TaskInProgress {
   public int getSuccessEventNumber() {
     return successEventNumber;
   }
-  
+
   /** 
    * Gets the Node list of input split locations sorted in rack order.
    */ 
@@ -1264,7 +1325,7 @@ class TaskInProgress {
     if (!isMapTask() || jobSetup || jobCleanup) {
       return "";
     }
-    String[] splits = splitInfo.getLocations();
+    String[] splits = splitInfo[0].getLocations();  // actually replicas
     Node[] nodes = new Node[splits.length];
     for (int i = 0; i < splits.length; i++) {
       nodes[i] = jobtracker.getNode(splits[i]);
@@ -1293,13 +1354,20 @@ class TaskInProgress {
   }
 
   public long getMapInputSize() {
-    if(isMapTask() && !jobSetup && !jobCleanup) {
-      return splitInfo.getInputDataLength();
+    if (isUberTask()) {
+      int numSplits = splitInfo.length;
+      long sumInputDataLength = 0;
+      for (int i = 0; i < numSplits; ++i) {
+        sumInputDataLength += splitInfo[i].getInputDataLength();
+      }
+      return sumInputDataLength;
+    } else if (isMapTask() && !jobSetup && !jobCleanup) {
+      return splitInfo[0].getInputDataLength();
     } else {
       return 0;
     }
   }
-  
+
   /**
    * Compare most recent task attempts dispatch time to current system time so
    * that task progress rate will slow down as time proceeds even if no progress

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Mar  8 05:53:52 2011
@@ -41,10 +41,15 @@ public abstract class TaskStatus impleme
   static final Log LOG =
     LogFactory.getLog(TaskStatus.class.getName());
   
-  //enumeration for reporting current phase of a task.
+  // what kind of TaskStatus is it?
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
-  public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
+  public static enum Type {MAP, REDUCE, UBER}
+
+  // enumeration for reporting current phase of a task.
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static enum Phase {STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
 
   // what state is the task in?
   @InterfaceAudience.Private
@@ -105,6 +110,7 @@ public abstract class TaskStatus impleme
   
   public TaskAttemptID getTaskID() { return taskid; }
   public abstract boolean getIsMap();
+  public abstract boolean getIsUber();
   public int getNumSlots() {
     return numSlots;
   }
@@ -495,46 +501,61 @@ public abstract class TaskStatus impleme
   //////////////////////////////////////////////////////////////////////////////
   // Factory-like methods to create/read/write appropriate TaskStatus objects
   //////////////////////////////////////////////////////////////////////////////
-  
-  static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, 
-                                     float progress, int numSlots,
-                                     State runState, String diagnosticInfo,
-                                     String stateString, String taskTracker,
-                                     Phase phase, Counters counters) 
-  throws IOException {
-    boolean isMap = in.readBoolean();
-    return createTaskStatus(isMap, taskId, progress, numSlots, runState, 
-                            diagnosticInfo, stateString, taskTracker, phase, 
-                            counters);
-  }
-  
+
+  // this is the main one used by TT.TIP, JIP, and (apparently) all the
+  // relevant tests (CapacityTestUtils, TestFairScheduler, TestClusterStatus,
+  // TestJobInProgress, TestJobTrackerInstrumentation, FakeObjectUtilities)
+  // [also formerly MapTask and ReduceTask, but no longer]
   static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, 
                                      float progress, int numSlots,
                                      State runState, String diagnosticInfo,
                                      String stateString, String taskTracker,
                                      Phase phase, Counters counters) { 
     return (isMap) ? new MapTaskStatus(taskId, progress, numSlots, runState, 
-                                       diagnosticInfo, stateString, taskTracker, 
+                                       diagnosticInfo, stateString, taskTracker,
                                        phase, counters) :
                      new ReduceTaskStatus(taskId, progress, numSlots, runState, 
                                           diagnosticInfo, stateString, 
                                           taskTracker, phase, counters);
   }
-  
-  static TaskStatus createTaskStatus(boolean isMap) {
-    return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus();
+
+  // used only in default ctors of Task (also formerly MapTask, ReduceTask) and
+  // readTaskStatus() below
+  static TaskStatus createTaskStatus(Type tsType) {
+    return (tsType == TaskStatus.Type.MAP)
+        ? new MapTaskStatus()
+        : (tsType == TaskStatus.Type.REDUCE)
+            ? new ReduceTaskStatus()
+            : new UberTaskStatus();
   }
 
   static TaskStatus readTaskStatus(DataInput in) throws IOException {
     boolean isMap = in.readBoolean();
-    TaskStatus taskStatus = createTaskStatus(isMap);
+    boolean isUber = in.readBoolean();
+    Type tsType = isMap
+        ? TaskStatus.Type.MAP
+        : isUber
+            ? TaskStatus.Type.UBER
+            : TaskStatus.Type.REDUCE;
+    TaskStatus taskStatus = createTaskStatus(tsType);
     taskStatus.readFields(in);
     return taskStatus;
   }
   
   static void writeTaskStatus(DataOutput out, TaskStatus taskStatus) 
   throws IOException {
+/* LATER
+ *  //GRR FIXME:  longer-term, just store tsType as member var (but then need
+ *  //            to modify or add new ctor:  used in many places)
+ *  Type tsType = taskStatus.getIsUber()
+ *      ? TaskStatus.Type.UBER
+ *      : taskStatus.getIsMap()
+ *          ? TaskStatus.Type.MAP
+ *          : TaskStatus.Type.REDUCE;
+ *  WritableUtils.writeEnum(out, tsType); (or enum.ordinal() [as in MR-901] or ...)
+ */
     out.writeBoolean(taskStatus.getIsMap());
+    out.writeBoolean(taskStatus.getIsUber());
     taskStatus.write(out);
   }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar  8 05:53:52 2011
@@ -827,7 +827,7 @@ public class TaskTracker 
           f = rjob.getFetchStatus();
           for (TaskInProgress tip : rjob.tasks) {
             Task task = tip.getTask();
-            if (!task.isMapTask()) {
+            if (!task.isMapTask() && !task.isUberTask()) {
               if (((ReduceTask)task).getPhase() == 
                   TaskStatus.Phase.SHUFFLE) {
                 if (rjob.getFetchStatus() == null) {
@@ -2502,21 +2502,27 @@ public class TaskTracker 
       this.lastProgressReport = System.currentTimeMillis();
       this.defaultJobConf = conf;
       localJobConf = null;
-      taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
-                                               0.0f, 
-                                               task.getNumSlotsRequired(),
-                                               task.getState(),
-                                               diagnosticInfo.toString(), 
-                                               "initializing",  
-                                               getName(), 
-                                               task.isTaskCleanupTask() ? 
-                                                 TaskStatus.Phase.CLEANUP :  
-                                               task.isMapTask()? TaskStatus.Phase.MAP:
-                                               TaskStatus.Phase.SHUFFLE,
-                                               task.getCounters()); 
+      if (task.isUberTask()) {
+        taskStatus = new UberTaskStatus(
+            task.getTaskID(), 0.0f,
+            task.getNumSlotsRequired(), task.getState(),
+            diagnosticInfo.toString(), "initializing", getName(),
+            TaskStatus.Phase.MAP, task.getCounters());
+      } else {
+        taskStatus = TaskStatus.createTaskStatus(
+            task.isMapTask(), task.getTaskID(), 0.0f,
+            task.getNumSlotsRequired(), task.getState(),
+            diagnosticInfo.toString(), "initializing", getName(),
+            task.isTaskCleanupTask()
+              ? TaskStatus.Phase.CLEANUP
+              : task.isMapTask()
+                ? TaskStatus.Phase.MAP
+                : TaskStatus.Phase.SHUFFLE,
+            task.getCounters());
+      }
       taskTimeout = (10 * 60 * 1000);
     }
-        
+
     void localizeTask(Task task) throws IOException{
 
       FileSystem localFs = FileSystem.getLocal(fConf);
@@ -2633,7 +2639,7 @@ public class TaskTracker 
         this.taskStatus.setStartTime(System.currentTimeMillis());
       } else {
         LOG.info("Not launching task: " + task.getTaskID() + 
-            " since it's state is " + this.taskStatus.getRunState());
+            " since its state is " + this.taskStatus.getRunState());
       }
     }
 
@@ -3206,7 +3212,7 @@ public class TaskTracker 
     
     LOG.debug("JVM with ID : " + jvmId + " asked for a task");
     if (!jvmManager.isJvmKnown(jvmId)) {
-      LOG.info("Killing unknown JVM " + jvmId);
+      LOG.info("Killing unknown JVM " + jvmId); //GRR FIXME:  bug?  no (apparent) killing going on here...
       return new JvmTask(null, true);
     }
     RunningJob rjob = runningJobs.get(jvmId.getJobId());
@@ -3237,7 +3243,7 @@ public class TaskTracker 
   public synchronized boolean statusUpdate(TaskAttemptID taskid, 
                                               TaskStatus taskStatus) 
   throws IOException {
-    TaskInProgress tip = tasks.get(taskid);
+    TaskInProgress tip = tasks.get(taskid);  // TT.TIP, not TaskInProgress.java
     if (tip != null) {
       tip.reportProgress(taskStatus);
       myInstrumentation.statusUpdate(tip.getTask(), taskStatus);
@@ -3685,17 +3691,15 @@ public class TaskTracker 
         }
         userName = rjob.jobConf.getUser();
       }
-      // Index file
-      Path indexFileName =
-          lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-              userName, jobId, mapId)
-              + "/file.out.index", conf);
-
-      // Map-output file
-      Path mapOutputFileName =
-          lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-              userName, jobId, mapId)
-              + "/file.out", conf);
+
+      // Map-output filename ("... /file.out")
+      StringBuilder sb = new StringBuilder();
+      sb.append(TaskTracker.getIntermediateOutputDir(userName, jobId, mapId))
+          .append("/").append(MapOutputFile.MAP_OUTPUT_FILENAME_STRING);
+      Path mapOutputFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf);
+      // Index filename ("... /file.out.index")
+      sb.append(MapOutputFile.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+      Path indexFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf);
 
       /**
        * Read the index file to get the information about where the map-output

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Mar  8 05:53:52 2011
@@ -271,4 +271,19 @@ public interface MRJobConfig {
     "mapreduce.job.submithostname";
   public static final String JOB_SUBMITHOSTADDR =
     "mapreduce.job.submithostaddress";
+
+  public static final String JOB_UBERTASK_ENABLE =
+    "mapreduce.job.ubertask.enable";
+  public static final String JOB_UBERTASK_MAXMAPS =
+    "mapreduce.job.ubertask.maxmaps";
+  public static final String JOB_UBERTASK_MAXREDUCES =
+    "mapreduce.job.ubertask.maxreduces";
+  public static final String JOB_UBERTASK_MAXBYTES =
+    "mapreduce.job.ubertask.maxbytes";
+  public static final String UBERTASK_JAVA_OPTS =
+    "mapreduce.ubertask.child.java.opts";  // or mapreduce.uber.java.opts?
+  public static final String UBERTASK_ULIMIT =
+    "mapreduce.ubertask.child.ulimit";     // or mapreduce.uber.ulimit?
+  public static final String UBERTASK_ENV =
+    "mapreduce.ubertask.child.env";        // or mapreduce.uber.env?
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Tue Mar  8 05:53:52 2011
@@ -64,6 +64,9 @@
           {"name": "launchTime", "type": "long"},
           {"name": "totalMaps", "type": "int"},
           {"name": "totalReduces", "type": "int"},
+          {"name": "isUber", "type": "boolean"},
+          {"name": "numUberSubMaps", "type": "int"},
+          {"name": "numUberSubReduces", "type": "int"},
           {"name": "jobStatus", "type": "string"}
       ]
      },

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java Tue Mar  8 05:53:52 2011
@@ -526,6 +526,8 @@ public class HistoryViewer {
      int totalReduces = 0; 
      int totalCleanups = 0;
      int totalSetups = 0;
+    int numUberSubMaps = 0;
+    int numUberSubReduces = 0;
      int numFailedMaps = 0; 
      int numKilledMaps = 0;
      int numFailedReduces = 0; 
@@ -544,15 +546,20 @@ public class HistoryViewer {
      long cleanupFinished = 0;
      long setupStarted = 0;
      long setupFinished = 0;
-     
+    boolean isUber = false;
+
      /** Get total maps */
      public int getTotalMaps() { return totalMaps; } 
      /** Get total reduces */
      public int getTotalReduces() { return totalReduces; } 
-     /** Get number of clean up tasks */ 
+     /** Get number of cleanup tasks */ 
      public int getTotalCleanups() { return totalCleanups; }
-     /** Get number of set up tasks */
+     /** Get number of setup tasks */
      public int getTotalSetups() { return totalSetups; }
+    /** Get number of map subtasks within UberTask */
+    public int getNumUberSubMaps() { return numUberSubMaps; }
+    /** Get number of reduce subtasks within UberTask */
+    public int getNumUberSubReduces() { return numUberSubReduces; }
      /** Get number of failed maps */
      public int getNumFailedMaps() { return numFailedMaps; }
      /** Get number of killed maps */
@@ -567,11 +574,11 @@ public class HistoryViewer {
      public int getNumFailedCleanups() { return numFailedCleanups; }
      /** Get number of killed cleanup tasks */
      public int getNumKilledCleanups() { return numKilledCleanups; }
-     /** Get number of finished set up tasks */
+     /** Get number of finished setup tasks */
      public int getNumFinishedSetups() { return numFinishedSetups; }
-     /** Get number of failed set up tasks */
+     /** Get number of failed setup tasks */
      public int getNumFailedSetups() { return numFailedSetups; }
-     /** Get number of killed set up tasks */
+     /** Get number of killed setup tasks */
      public int getNumKilledSetups() { return numKilledSetups; }
      /** Get number of maps that were started */
      public long getMapStarted() { return mapStarted; } 
@@ -589,8 +596,10 @@ public class HistoryViewer {
      public long getSetupStarted() { return setupStarted; }
      /** Get number of setup tasks that finished */
      public long getSetupFinished() { return setupFinished; }
+    /** Get job's UberTask/non-UberTask status */
+    public boolean isUber() { return isUber; }
 
-     /** Create summary information for the parsed job */
+    /** Create summary information for the parsed job */
     public SummarizedJob(JobInfo job) {
       tasks = job.getAllTasks();
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Tue Mar  8 05:53:52 2011
@@ -302,6 +302,9 @@ public class JobHistoryParser {
     info.launchTime = event.getLaunchTime();
     info.totalMaps = event.getTotalMaps();
     info.totalReduces = event.getTotalReduces();
+    info.isUber = event.getIsUber();
+    info.numUberSubMaps = event.getNumUberSubMaps();
+    info.numUberSubReduces = event.getNumUberSubReduces();
   }
 
   private void handleJobInfoChangeEvent(JobInfoChangeEvent event) {
@@ -334,6 +337,9 @@ public class JobHistoryParser {
     long launchTime;
     int totalMaps;
     int totalReduces;
+    boolean isUber;
+    int numUberSubMaps;
+    int numUberSubReduces;
     int failedMaps;
     int failedReduces;
     int finishedMaps;
@@ -352,8 +358,9 @@ public class JobHistoryParser {
      */
     public JobInfo() {
       submitTime = launchTime = finishTime = -1;
-      totalMaps = totalReduces = failedMaps = failedReduces = 0;
-      finishedMaps = finishedReduces = 0;
+      isUber = false;
+      totalMaps = totalReduces = numUberSubMaps = numUberSubReduces = 0;
+      failedMaps = failedReduces = finishedMaps = finishedReduces = 0;
       username = jobname = jobConfPath = jobQueueName = "";
       tasksMap = new HashMap<TaskID, TaskInfo>();
       jobACLs = new HashMap<JobACL, AccessControlList>();
@@ -370,6 +377,7 @@ public class JobHistoryParser {
       System.out.println("PRIORITY: " + priority);
       System.out.println("TOTAL_MAPS: " + totalMaps);
       System.out.println("TOTAL_REDUCES: " + totalReduces);
+      //GRR FIXME:  add UBER_SUBMAPS and UBER_SUBREDUCES? (or only if isUber == true? coordinate with TaskInfo printAll() changes)
       System.out.println("MAP_COUNTERS:" + mapCounters.toString());
       System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString());
       System.out.println("TOTAL_COUNTERS: " + totalCounters.toString());
@@ -395,18 +403,24 @@ public class JobHistoryParser {
     public String getJobConfPath() { return jobConfPath; }
     /** Get the job launch time */
     public long getLaunchTime() { return launchTime; }
-    /** Get the total number of maps */
-    public long getTotalMaps() { return totalMaps; }
-    /** Get the total number of reduces */
-    public long getTotalReduces() { return totalReduces; }
+    /** Get the total number of "real" maps */
+    public int getTotalMaps() { return totalMaps; }
+    /** Get the total number of "real" reduces */
+    public int getTotalReduces() { return totalReduces; }
+    /** Was the job small enough to be converted to an UberTask? */
+    public boolean getIsUber() { return isUber; }
+    /** Get the number of sub-MapTasks within the UberTask */
+    public int getNumUberSubMaps() { return numUberSubMaps; }
+    /** Get the number of sub-ReduceTasks within the UberTask */
+    public int getNumUberSubReduces() { return numUberSubReduces; }
     /** Get the total number of failed maps */
-    public long getFailedMaps() { return failedMaps; }
+    public int getFailedMaps() { return failedMaps; }
     /** Get the number of failed reduces */
-    public long getFailedReduces() { return failedReduces; }
+    public int getFailedReduces() { return failedReduces; }
     /** Get the number of finished maps */
-    public long getFinishedMaps() { return finishedMaps; }
+    public int getFinishedMaps() { return finishedMaps; }
     /** Get the number of finished reduces */
-    public long getFinishedReduces() { return finishedReduces; }
+    public int getFinishedReduces() { return finishedReduces; }
     /** Get the job status */
     public String getJobStatus() { return jobStatus; }
     public String getErrorInfo() { return errorInfo; }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java Tue Mar  8 05:53:52 2011
@@ -44,11 +44,15 @@ public class JobInitedEvent implements H
    * @param jobStatus
    */
   public JobInitedEvent(JobID id, long launchTime, int totalMaps,
-                        int totalReduces, String jobStatus) {
+                        int totalReduces, boolean isUber, int numUberSubMaps,
+                        int numUberSubReduces, String jobStatus) {
     datum.jobid = new Utf8(id.toString());
     datum.launchTime = launchTime;
     datum.totalMaps = totalMaps;
     datum.totalReduces = totalReduces;
+    datum.isUber = isUber;
+    datum.numUberSubMaps = numUberSubMaps;
+    datum.numUberSubReduces = numUberSubReduces;
     datum.jobStatus = new Utf8(jobStatus);
   }
 
@@ -61,10 +65,16 @@ public class JobInitedEvent implements H
   public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
   /** Get the launch time */
   public long getLaunchTime() { return datum.launchTime; }
-  /** Get the total number of maps */
+  /** Get the total number of "real" maps */
   public int getTotalMaps() { return datum.totalMaps; }
-  /** Get the total number of reduces */
+  /** Get the total number of "real" reduces */
   public int getTotalReduces() { return datum.totalReduces; }
+  /** Was the job small enough to be converted to an UberTask? */
+  public boolean getIsUber() { return datum.isUber; }
+  /** Get the number of sub-MapTasks within the UberTask */
+  public int getNumUberSubMaps() { return datum.numUberSubMaps; }
+  /** Get the number of sub-ReduceTasks within the UberTask */
+  public int getNumUberSubReduces() { return datum.numUberSubReduces; }
   /** Get the status */
   public String getStatus() { return datum.jobStatus.toString(); }
  /** Get the event type */

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Mar  8 05:53:52 2011
@@ -241,8 +241,12 @@ public class TestJobQueueTaskScheduler e
         }
 
         @Override
+        public boolean getIsUber() {
+          return t.isUberTask();
+        }
+
+        @Override
         public void addFetchFailedMap(TaskAttemptID mapTaskId) {
-          
         }
       };
       status.setRunState(TaskStatus.State.RUNNING);

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java Tue Mar  8 05:53:52 2011
@@ -103,7 +103,7 @@ public class TestTaskStatus {
     // check the default case
     String test = "hi";
     final int maxSize = 16;
-    TaskStatus status = new TaskStatus(null, 0, 0, null, test, test, null, null, 
+    TaskStatus status = new TaskStatus(null, 0, 0, null, test, test, null, null,
                                        null) {
       @Override
       protected int getMaxStringSize() {
@@ -118,6 +118,11 @@ public class TestTaskStatus {
       public boolean getIsMap() {
         return false;
       }
+
+      @Override
+      public boolean getIsUber() {
+        return false;
+      }
     };
     assertEquals("Small diagnostic info test failed", 
                  status.getDiagnosticInfo(), test);
@@ -198,6 +203,11 @@ public class TestTaskStatus {
       public boolean getIsMap() {
         return false;
       }
+
+      @Override
+      public boolean getIsUber() {
+        return false;
+      }
     };
     assertEquals("Large diagnostic info test failed", 
                 maxSize, status.getDiagnosticInfo().length());

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java Tue Mar  8 05:53:52 2011
@@ -125,10 +125,15 @@ public class Job20LineHistoryEventEmitte
       String status = line.get("JOB_STATUS");
       String totalMaps = line.get("TOTAL_MAPS");
       String totalReduces = line.get("TOTAL_REDUCES");
+      // note:  UberTask playback not supported since uber data not yet logged
+      boolean isUber = false;
+      int numUberSubMaps = 0;
+      int numUberSubReduces = 0;
 
       if (launchTime != null && totalMaps != null && totalReduces != null) {
-        return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
-            .parseInt(totalMaps), Integer.parseInt(totalReduces), status);
+        return new JobInitedEvent(jobID, Long.parseLong(launchTime),
+            Integer.parseInt(totalMaps), Integer.parseInt(totalReduces),
+            isUber, numUberSubMaps, numUberSubReduces, status);
       }
 
       return null;

Modified: hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp Tue Mar  8 05:53:52 2011
@@ -365,10 +365,16 @@
               "<th>Killed</th>" +
               "<th><a href=\"jobfailures.jsp?jobid=" + jobId + 
               "\">Failed/Killed<br>Task Attempts</a></th></tr>\n");
-    printTaskSummary(out, jobId, "map", status.mapProgress(), 
-                     job.getTasks(TaskType.MAP));
-    printTaskSummary(out, jobId, "reduce", status.reduceProgress(),
-                     job.getTasks(TaskType.REDUCE));
+    if (job.getUberMode()) {
+      /* placeholder until true task- and attempt-level uber info available */
+      printTaskSummary(out, jobId, "uber", status.reduceProgress(),
+                       job.getTasks(TaskType.REDUCE));
+    } else {
+      printTaskSummary(out, jobId, "map", status.mapProgress(), 
+                       job.getTasks(TaskType.MAP));
+      printTaskSummary(out, jobId, "reduce", status.reduceProgress(),
+                       job.getTasks(TaskType.REDUCE));
+    }
     out.print("</table>\n");
     
     %>
@@ -421,6 +427,7 @@
     %>
     </table>
 
+<%if (job.getTasks(TaskType.MAP).length > 0) { %>
 <hr>Map Completion Graph - 
 <%
 if("off".equals(request.getParameter("map.graph"))) {
@@ -442,10 +449,14 @@ if("off".equals(session.getAttribute("ma
        width="<%=TaskGraphServlet.width + 2 * TaskGraphServlet.xmargin%>" 
        height="<%=TaskGraphServlet.height + 3 * TaskGraphServlet.ymargin%>"
        style="width:100%" type="image/svg+xml" pluginspage="http://www.adobe.com/svg/viewer/install/" />
-<%}%>
+<%} }%>
 
-<%if(job.getTasks(TaskType.REDUCE).length > 0) { %>
+<%if (job.getTasks(TaskType.REDUCE).length > 0) { %>
+<%if (job.getUberMode()) { %>
+<hr>UberTask Completion Graph -
+<%} else { %>
 <hr>Reduce Completion Graph -
+<%}%>
 <%if("off".equals(session.getAttribute("reduce.graph"))) { %>
 <a href="/jobdetails.jsp?jobid=<%=jobId%>&refresh=<%=refresh%>&reduce.graph=on" > open </a>
 <%} else { %> 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp Tue Mar  8 05:53:52 2011
@@ -56,6 +56,12 @@
     if (job == null) {
       return;
     }
+    // MR-1220 FIXME (LATER):  to fully integrate uberization, need task-
+    // and attempt-level JobHistory/Avro/etc. changes to multiple Event
+    // types; instead, going with top-level hack for now
+    boolean isUber = job.getIsUber();
+    int numUberSubMaps = job.getNumUberSubMaps();
+    int numUberSubReduces = job.getNumUberSubReduces();
     if (job.getJobStatus().equals("FAILED")) 
       reasonforFailure = job.getErrorInfo();
 %>
@@ -90,7 +96,7 @@
 <center>
 <table border="2" cellpadding="5" cellspacing="2">
 <tr>
-<td>Kind</td><td>Total Tasks(successful+failed+killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
+<td>Kind</td><td>Total Tasks (successful+<wbr>failed+<wbr>killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
 </tr>
 <tr>
 <td>Setup</td>
@@ -106,6 +112,10 @@
     <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getSetupFinished(), sj.getSetupStarted()) %></td>
 </tr>
 <tr>
+
+<%
+    if (!isUber) {
+%>
 <td>Map</td>
     <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=MAP&status=all">
         <%=sj.getTotalMaps()%></a></td>
@@ -131,6 +141,36 @@
     <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceStarted(), 0) %></td>
     <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceFinished(), sj.getReduceStarted()) %></td>
 </tr>
+
+<%
+    } else /* isUber */ {
+%>
+
+<tr>
+<td>Uber</td>
+    <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=all">
+        <%=sj.getTotalReduces()%></a></td>
+    <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=SUCCEEDED">
+        <%=job.getFinishedReduces()%></a></td>
+    <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=FAILED">
+        <%=sj.getNumFailedReduces()%></a></td>
+    <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=KILLED">
+        <%=sj.getNumKilledReduces()%></a></td>  
+    <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceStarted(), 0) %></td>
+    <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceFinished(), sj.getReduceStarted()) %></td>
+</tr>
+<td>Map subtasks</td>
+    <td colspan="6"><%=numUberSubMaps%></td>
+</tr>
+<tr>
+<td>Reduce subtasks</td>
+    <td colspan="6"><%=numUberSubReduces%></td>
+</tr>
+
+<%
+    }
+%>
+
 <tr>
 <td>Cleanup</td>
     <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=JOB_CLEANUP&status=all">



Mime
View raw message