hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1079193 [2/3] - in /hadoop/mapreduce/branches/yahoo-merge/src: contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ contrib/mumak/src/java/org/apache/hadoop/mapred/ java/ ja...
Date Tue, 08 Mar 2011 05:54:14 GMT
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Mar  8 05:54:13 2011
@@ -72,10 +72,8 @@ public class ReduceTask extends Task {
 
   private CompressionCodec codec;
 
-
   { 
     getProgress().setStatus("reduce"); 
-    setPhase(TaskStatus.Phase.SHUFFLE);        // phase to start with 
   }
 
   private Progress copyPhase;
@@ -121,14 +119,21 @@ public class ReduceTask extends Task {
 
   public ReduceTask() {
     super();
+    this.taskStatus = new ReduceTaskStatus();
   }
 
   public ReduceTask(String jobFile, TaskAttemptID taskId,
                     int partition, int numMaps, int numSlotsRequired) {
     super(jobFile, taskId, partition, numSlotsRequired);
     this.numMaps = numMaps;
+/*
+ */
+    this.taskStatus = new ReduceTaskStatus(getTaskID(), 0.0f, numSlotsRequired,
+                                           TaskStatus.State.UNASSIGNED,
+                                           "", "", "", TaskStatus.Phase.SHUFFLE,
+                                           getCounters());
   }
-  
+
   private CompressionCodec initCodec() {
     // check if map-outputs are to be compressed
     if (conf.getCompressMapOutput()) {
@@ -151,6 +156,45 @@ public class ReduceTask extends Task {
     return false;
   }
 
+  /**
+   * Is this really a combo-task masquerading as a plain MapTask?  Decidedly
+   * not.
+   */
+  @Override
+  public boolean isUberTask() {
+    return false;
+  }
+
+  /**
+   * Allow UberTask (or, potentially, JobInProgress or others) to set up a
+   * deeper Progress hierarchy even if run() is skipped.  If setProgress()
+   * is also needed, it should be called <I>before</I> createPhase() or else
+   * the sub-phases created here will be wiped out.
+   */
+  void createPhase(TaskStatus.Phase phaseType, String status) {
+    if (phaseType == TaskStatus.Phase.SHUFFLE) {
+      copyPhase = getProgress().addPhase(status);
+    } else if (phaseType == TaskStatus.Phase.SORT) {
+      sortPhase = getProgress().addPhase(status);
+    } else /* TaskStatus.Phase.REDUCE */ {
+      reducePhase = getProgress().addPhase(status);
+    }
+  }
+
+  /**
+   * Allow UberTask to traverse the deeper Progress hierarchy in case run() is
+   * skipped.
+   */
+  void completePhase(TaskStatus.Phase phaseType) {
+    if (phaseType == TaskStatus.Phase.SHUFFLE) {
+      copyPhase.complete();
+    } else if (phaseType == TaskStatus.Phase.SORT) {
+      sortPhase.complete();
+    } else /* TaskStatus.Phase.REDUCE */ {
+      reducePhase.complete();
+    }
+  }
+
   public int getNumMaps() { return numMaps; }
   
   /**
@@ -177,37 +221,39 @@ public class ReduceTask extends Task {
   }
   
   // Get the input files for the reducer.
-  private Path[] getMapFiles(FileSystem fs, boolean isLocal) 
+  static Path[] getMapFiles(ReduceTask reduce, FileSystem fs, boolean isLocal) 
   throws IOException {
     List<Path> fileList = new ArrayList<Path>();
     if (isLocal) {
       // for local jobs
-      for(int i = 0; i < numMaps; ++i) {
-        fileList.add(mapOutputFile.getInputFile(i));
+      for (int i = 0; i < reduce.numMaps; ++i) {
+        fileList.add(reduce.mapOutputFile.getInputFile(i));
       }
     } else {
       // for non local jobs
-      for (FileStatus filestatus : mapOutputFilesOnDisk) {
+      for (FileStatus filestatus : reduce.mapOutputFilesOnDisk) {
         fileList.add(filestatus.getPath());
       }
     }
     return fileList.toArray(new Path[0]);
   }
 
-  private class ReduceValuesIterator<KEY,VALUE> 
+  private static class ReduceValuesIterator<KEY,VALUE> 
           extends ValuesIterator<KEY,VALUE> {
-    public ReduceValuesIterator (RawKeyValueIterator in,
+    ReduceTask reduce;
+    public ReduceValuesIterator (ReduceTask reduce, RawKeyValueIterator in,
                                  RawComparator<KEY> comparator, 
                                  Class<KEY> keyClass,
                                  Class<VALUE> valClass,
                                  Configuration conf, Progressable reporter)
       throws IOException {
       super(in, comparator, keyClass, valClass, conf, reporter);
+      this.reduce = reduce;
     }
 
     @Override
     public VALUE next() {
-      reduceInputValueCounter.increment(1);
+      reduce.reduceInputValueCounter.increment(1);
       return moveToNext();
     }
     
@@ -216,12 +262,13 @@ public class ReduceTask extends Task {
     }
     
     public void informReduceProgress() {
-      reducePhase.set(super.in.getProgress().getProgress()); // update progress
+      // update progress:
+      reduce.reducePhase.set(super.in.getProgress().getProgress());
       reporter.progress();
     }
   }
 
-  private class SkippingReduceValuesIterator<KEY,VALUE> 
+  private static class SkippingReduceValuesIterator<KEY,VALUE> 
      extends ReduceValuesIterator<KEY,VALUE> {
      private SkipRangeIterator skipIt;
      private TaskUmbilicalProtocol umbilical;
@@ -234,26 +281,27 @@ public class ReduceTask extends Task {
      private boolean toWriteSkipRecs;
      private boolean hasNext;
      private TaskReporter reporter;
-     
-     public SkippingReduceValuesIterator(RawKeyValueIterator in,
+
+     public SkippingReduceValuesIterator(ReduceTask reduce,
+         RawKeyValueIterator in,
          RawComparator<KEY> comparator, Class<KEY> keyClass,
          Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
          TaskUmbilicalProtocol umbilical) throws IOException {
-       super(in, comparator, keyClass, valClass, conf, reporter);
+       super(reduce, in, comparator, keyClass, valClass, conf, reporter);
        this.umbilical = umbilical;
-       this.skipGroupCounter = 
+       this.skipGroupCounter =
          reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
-       this.skipRecCounter = 
+       this.skipRecCounter =
          reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
-       this.toWriteSkipRecs = toWriteSkipRecs() &&  
+       this.toWriteSkipRecs = reduce.toWriteSkipRecs() &&
          SkipBadRecords.getSkipOutputPath(conf)!=null;
        this.keyClass = keyClass;
        this.valClass = valClass;
        this.reporter = reporter;
-       skipIt = getSkipRanges().skipRangeIterator();
+       skipIt = reduce.getSkipRanges().skipRangeIterator();
        mayBeSkip();
      }
-     
+
      public void nextKey() throws IOException {
        super.nextKey();
        mayBeSkip();
@@ -292,16 +340,16 @@ public class ReduceTask extends Task {
        }
        skipGroupCounter.increment(skip);
        skipRecCounter.increment(skipRec);
-       reportNextRecordRange(umbilical, grpIndex);
+       reduce.reportNextRecordRange(umbilical, grpIndex);
      }
      
      @SuppressWarnings("unchecked")
      private void writeSkippedRec(KEY key, VALUE value) throws IOException{
        if(skipWriter==null) {
-         Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
-         Path skipFile = new Path(skipDir, getTaskID().toString());
+         Path skipDir = SkipBadRecords.getSkipOutputPath(reduce.conf);
+         Path skipFile = new Path(skipDir, reduce.getTaskID().toString());
          skipWriter = SequenceFile.createWriter(
-               skipFile.getFileSystem(conf), conf, skipFile,
+               skipFile.getFileSystem(reduce.conf), reduce.conf, skipFile,
                keyClass, valClass, 
                CompressionType.BLOCK, reporter);
        }
@@ -363,8 +411,8 @@ public class ReduceTask extends Task {
     } else {
       final FileSystem rfs = FileSystem.getLocal(job).getRaw();
       rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
-                           job.getMapOutputValueClass(), codec, 
-                           getMapFiles(rfs, true),
+                           job.getMapOutputValueClass(), codec,
+                           getMapFiles(this, rfs, true),
                            !conf.getKeepFailedTaskFiles(), 
                            job.getInt(JobContext.IO_SORT_FACTOR, 100),
                            new Path(getTaskID().toString()), 
@@ -382,18 +430,40 @@ public class ReduceTask extends Task {
     RawComparator comparator = job.getOutputValueGroupingComparator();
 
     if (useNewApi) {
-      runNewReducer(job, umbilical, reporter, rIter, comparator, 
+      runNewReducer(this, job, umbilical, reporter, rIter, comparator,
                     keyClass, valueClass);
     } else {
-      runOldReducer(job, umbilical, reporter, rIter, comparator, 
+      runOldReducer(this, job, umbilical, reporter, rIter, comparator,
                     keyClass, valueClass);
     }
     done(umbilical, reporter);
   }
 
+  private static class WrappedOutputCollector<OUTKEY, OUTVALUE>
+  implements OutputCollector<OUTKEY, OUTVALUE> {
+    RecordWriter<OUTKEY, OUTVALUE> out;
+    TaskReporter reporter;
+    Counters.Counter reduceOutputCounter;
+    public WrappedOutputCollector(ReduceTask reduce,
+                                  RecordWriter<OUTKEY, OUTVALUE> out,
+                                  TaskReporter reporter) {
+      this.out = out;
+      this.reporter = reporter;
+      this.reduceOutputCounter = reduce.reduceOutputCounter;
+    }
+
+    public void collect(OUTKEY key, OUTVALUE value)
+    throws IOException {
+      out.write(key, value);
+      reduceOutputCounter.increment(1);
+      // indicate that progress update needs to be sent
+      reporter.progress();
+    }
+  }
+
   @SuppressWarnings("unchecked")
-  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
-  void runOldReducer(JobConf job,
+  static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runOldReducer(ReduceTask reduce, JobConf job,
                      TaskUmbilicalProtocol umbilical,
                      final TaskReporter reporter,
                      RawKeyValueIterator rIter,
@@ -403,7 +473,7 @@ public class ReduceTask extends Task {
     Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = 
       ReflectionUtils.newInstance(job.getReducerClass(), job);
     // make output collector
-    String finalName = getOutputName(getPartition());
+    String finalName = getOutputName(reduce.getPartition());
 
     FileSystem fs = FileSystem.get(job);
 
@@ -411,15 +481,7 @@ public class ReduceTask extends Task {
       job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);  
     
     OutputCollector<OUTKEY,OUTVALUE> collector = 
-      new OutputCollector<OUTKEY,OUTVALUE>() {
-        public void collect(OUTKEY key, OUTVALUE value)
-          throws IOException {
-          out.write(key, value);
-          reduceOutputCounter.increment(1);
-          // indicate that progress update needs to be sent
-          reporter.progress();
-        }
-      };
+      new WrappedOutputCollector<OUTKEY, OUTVALUE>(reduce, out, reporter);
     
     // apply reduce function
     try {
@@ -427,16 +489,16 @@ public class ReduceTask extends Task {
       boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
         SkipBadRecords.getAutoIncrReducerProcCount(job);
       
-      ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ? 
-          new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter, 
+      ReduceValuesIterator<INKEY,INVALUE> values = reduce.isSkipping() ?
+          new SkippingReduceValuesIterator<INKEY,INVALUE>(reduce, rIter,
               comparator, keyClass, valueClass, 
               job, reporter, umbilical) :
-          new ReduceValuesIterator<INKEY,INVALUE>(rIter, 
+          new ReduceValuesIterator<INKEY,INVALUE>(reduce, rIter,
           job.getOutputValueGroupingComparator(), keyClass, valueClass, 
           job, reporter);
       values.informReduceProgress();
       while (values.more()) {
-        reduceInputKeyCounter.increment(1);
+        reduce.reduceInputKeyCounter.increment(1);
         reducer.reduce(values.getKey(), values, collector, reporter);
         if(incrProcCount) {
           reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
@@ -449,7 +511,7 @@ public class ReduceTask extends Task {
       //Clean up: repeated in catch block below
       reducer.close();
       out.close(reporter);
-      //End of clean up.
+      //End of cleanup.
     } catch (IOException ioe) {
       try {
         reducer.close();
@@ -487,9 +549,38 @@ public class ReduceTask extends Task {
     }
   }
 
+  private static class WrappedRawKeyValueIterator implements RawKeyValueIterator {
+    ReduceTask reduce;
+    TaskReporter reporter;
+    RawKeyValueIterator rawIter;
+    public WrappedRawKeyValueIterator(ReduceTask reduce, TaskReporter reporter,
+                          RawKeyValueIterator rawIter) {
+      this.reduce = reduce;
+      this.rawIter = rawIter;
+      this.reporter = reporter;
+    }
+    public void close() throws IOException {
+      rawIter.close();
+    }
+    public DataInputBuffer getKey() throws IOException {
+      return rawIter.getKey();
+    }
+    public Progress getProgress() {
+      return rawIter.getProgress();
+    }
+    public DataInputBuffer getValue() throws IOException {
+      return rawIter.getValue();
+    }
+    public boolean next() throws IOException {
+      boolean ret = rawIter.next();
+      reporter.setProgress(rawIter.getProgress().getProgress());
+      return ret;
+    }
+  }
+
   @SuppressWarnings("unchecked")
-  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
-  void runNewReducer(JobConf job,
+  static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runNewReducer(final ReduceTask reduce, JobConf job,
                      final TaskUmbilicalProtocol umbilical,
                      final TaskReporter reporter,
                      RawKeyValueIterator rIter,
@@ -498,49 +589,29 @@ public class ReduceTask extends Task {
                      Class<INVALUE> valueClass
                      ) throws IOException,InterruptedException, 
                               ClassNotFoundException {
+    org.apache.hadoop.mapreduce.TaskAttemptID reduceId = reduce.getTaskID();
     // wrap value iterator to report progress.
     final RawKeyValueIterator rawIter = rIter;
-    rIter = new RawKeyValueIterator() {
-      public void close() throws IOException {
-        rawIter.close();
-      }
-      public DataInputBuffer getKey() throws IOException {
-        return rawIter.getKey();
-      }
-      public Progress getProgress() {
-        return rawIter.getProgress();
-      }
-      public DataInputBuffer getValue() throws IOException {
-        return rawIter.getValue();
-      }
-      public boolean next() throws IOException {
-        boolean ret = rawIter.next();
-        reporter.setProgress(rawIter.getProgress().getProgress());
-        return ret;
-      }
-    };
+    rIter = new WrappedRawKeyValueIterator(reduce, reporter, rawIter);
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID());
+      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, reduceId);
     // make a reducer
     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
       (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
         ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
       (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
-        outputFormat.getRecordWriter(taskContext);
+        reduce.outputFormat.getRecordWriter(taskContext);
     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
-      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter);
-    job.setBoolean("mapred.skip.on", isSkipping());
-    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
-    org.apache.hadoop.mapreduce.Reducer.Context 
-         reducerContext = createReduceContext(reducer, job, getTaskID(),
-                                               rIter, reduceInputKeyCounter, 
-                                               reduceInputValueCounter, 
-                                               trackedRW,
-                                               committer,
-                                               reporter, comparator, keyClass,
-                                               valueClass);
+      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduce.reduceOutputCounter);
+    job.setBoolean(JobContext.SKIP_RECORDS, reduce.isSkipping());
+    org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
+      createReduceContext(reducer, job, reduceId, rIter,
+                          reduce.reduceInputKeyCounter, 
+                          reduce.reduceInputValueCounter, 
+                          trackedRW, reduce.committer, reporter,
+                          comparator, keyClass, valueClass);
     reducer.run(reducerContext);
     output.close(reducerContext);
   }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Tue Mar  8 05:54:13 2011
@@ -54,6 +54,11 @@ class ReduceTaskStatus extends TaskStatu
   }
 
   @Override
+  public boolean getIsUber() {
+    return false;
+  }
+
+  @Override
   void setFinishTime(long finishTime) {
     if (shuffleFinishTime == 0) {
       this.shuffleFinishTime = finishTime; 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar  8 05:54:13 2011
@@ -118,6 +118,7 @@ abstract public class Task implements Wr
   private String jobFile;                         // job configuration file
   private String user;                            // user running the job
   private TaskAttemptID taskId;                   // unique, includes job id
+  private TaskAttemptID taskIdForUmbilical;       // same, or uber's if subtask
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
   protected JobStatus.State jobRunStateForCleanup;
@@ -159,8 +160,8 @@ abstract public class Task implements Wr
   ////////////////////////////////////////////
 
   public Task() {
-    taskStatus = TaskStatus.createTaskStatus(isMapTask());
     taskId = new TaskAttemptID();
+    taskIdForUmbilical = taskId;
     spilledRecordsCounter = 
       counters.findCounter(TaskCounter.SPILLED_RECORDS);
     failedShuffleCounter = 
@@ -174,17 +175,10 @@ abstract public class Task implements Wr
               int numSlotsRequired) {
     this.jobFile = jobFile;
     this.taskId = taskId;
+    this.taskIdForUmbilical = taskId;
      
     this.partition = partition;
     this.numSlotsRequired = numSlotsRequired;
-    this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId, 
-                                                  0.0f, numSlotsRequired, 
-                                                  TaskStatus.State.UNASSIGNED, 
-                                                  "", "", "", 
-                                                  isMapTask() ? 
-                                                    TaskStatus.Phase.MAP : 
-                                                    TaskStatus.Phase.SHUFFLE, 
-                                                  counters);
     spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
     failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE);
     mergedMapOutputsCounter = 
@@ -197,7 +191,12 @@ abstract public class Task implements Wr
   ////////////////////////////////////////////
   public void setJobFile(String jobFile) { this.jobFile = jobFile; }
   public String getJobFile() { return jobFile; }
+
   public TaskAttemptID getTaskID() { return taskId; }
+  public void setTaskIdForUmbilical(TaskAttemptID taskIdForUmbilical) {
+    this.taskIdForUmbilical = taskIdForUmbilical;
+  }
+
   public int getNumSlotsRequired() {
     return numSlotsRequired;
   }
@@ -269,7 +268,7 @@ abstract public class Task implements Wr
   /**
    * Report a fatal error to the parent (task) tracker.
    */
-  protected void reportFatalError(TaskAttemptID id, Throwable throwable, 
+  protected void reportFatalError(Throwable throwable,
                                   String logMsg) {
     LOG.fatal(logMsg);
     Throwable tCause = throwable.getCause();
@@ -277,7 +276,7 @@ abstract public class Task implements Wr
                    ? StringUtils.stringifyException(throwable)
                    : StringUtils.stringifyException(tCause);
     try {
-      umbilical.fatalError(id, cause);
+      umbilical.fatalError(taskIdForUmbilical, cause);
     } catch (IOException ioe) {
       LOG.fatal("Failed to contact the tasktracker", ioe);
       System.exit(-1);
@@ -389,6 +388,14 @@ abstract public class Task implements Wr
     this.user = user;
   }
 
+  /**
+   * Return the task's MapOutputFile instance.
+   * @return the task's MapOutputFile instance
+   */
+  MapOutputFile getMapOutputFile() {
+    return mapOutputFile;
+  }
+
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -396,6 +403,7 @@ abstract public class Task implements Wr
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, jobFile);
     taskId.write(out);
+    taskIdForUmbilical.write(out);
     out.writeInt(partition);
     out.writeInt(numSlotsRequired);
     taskStatus.write(out);
@@ -414,6 +422,7 @@ abstract public class Task implements Wr
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
     taskId = TaskAttemptID.read(in);
+    taskIdForUmbilical = TaskAttemptID.read(in);
     partition = in.readInt();
     numSlotsRequired = in.readInt();
     taskStatus.readFields(in);
@@ -465,7 +474,7 @@ abstract public class Task implements Wr
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 3000;
 
-  private transient Progress taskProgress = new Progress();
+  private transient Progress taskProgress = new Progress(); //GRR Q:  why transient?  lose entire tree every time serialize??
 
   // Current counters
   private transient Counters counters = new Counters();
@@ -475,6 +484,21 @@ abstract public class Task implements Wr
   
   public abstract boolean isMapTask();
 
+  /**
+   * Is this really a combo-task masquerading as a plain MapTask?
+   */
+  public abstract boolean isUberTask();
+
+  /**
+   * This setter allows one to incorporate Tasks into multiple levels of
+   * a Progress addPhase()-generated hierarchy (i.e., not always the root
+   * node), which in turn allows Progress to handle all details of progress
+   * aggregation for an UberTask or even a whole job.
+   */
+  protected void setProgress(Progress progress) {
+    taskProgress = progress;
+  }
+
   public Progress getProgress() { return taskProgress; }
 
   public void initialize(JobConf job, JobID id, 
@@ -518,7 +542,7 @@ abstract public class Task implements Wr
         resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
     }
   }
-  
+
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   protected class TaskReporter 
@@ -561,6 +585,7 @@ abstract public class Task implements Wr
       // indicate that progress update needs to be sent
       setProgressFlag();
     }
+    // FIXME?  why isn't this deprecated in favor of public setProgressFlag()?
     public void progress() {
       // indicate that progress update needs to be sent
       setProgressFlag();
@@ -603,15 +628,16 @@ abstract public class Task implements Wr
     }
     public InputSplit getInputSplit() throws UnsupportedOperationException {
       if (split == null) {
-        throw new UnsupportedOperationException("Input only available on map");
+        throw new UnsupportedOperationException("Input available only on map");
       } else {
         return split;
       }
     }  
     /** 
-     * The communication thread handles communication with the parent (Task Tracker). 
-     * It sends progress updates if progress has been made or if the task needs to 
-     * let the parent know that it's alive. It also pings the parent to see if it's alive. 
+     * The communication thread handles communication with the parent
+     * (TaskTracker). It sends progress updates if progress has been made or
+     * if the task needs to let the parent know that it's alive. It also pings
+     * the parent to see if it's alive. 
      */
     public void run() {
       final int MAX_RETRIES = 3;
@@ -647,8 +673,8 @@ abstract public class Task implements Wr
             taskFound = umbilical.ping(taskId);
           }
 
-          // if Task Tracker is not aware of our task ID (probably because it died and 
-          // came back up), kill ourselves
+          // if TaskTracker is not aware of our task ID (probably because it
+          // died and came back up), kill ourselves
           if (!taskFound) {
             LOG.warn("Parent died.  Exiting "+taskId);
             System.exit(66);
@@ -682,7 +708,7 @@ abstract public class Task implements Wr
       }
     }
   }
-  
+
   /**
    *  Reports the next executing record range to TaskTracker.
    *  
@@ -701,7 +727,7 @@ abstract public class Task implements Wr
     if (LOG.isDebugEnabled()) {
       LOG.debug("sending reportNextRecordRange " + range);
     }
-    umbilical.reportNextRecordRange(taskId, range);
+    umbilical.reportNextRecordRange(taskIdForUmbilical, range);
   }
 
   /**
@@ -837,8 +863,13 @@ abstract public class Task implements Wr
   public void done(TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, InterruptedException {
-    LOG.info("Task:" + taskId + " is done."
-             + " And is in the process of commiting");
+    if (isUberTask()) {
+      LOG.info("UberTask:" + taskIdForUmbilical + " subtask:" + taskId
+               + "is done and is in the process of committing.");
+    } else {
+      LOG.info("Task:" + taskId
+               + "is done and is in the process of committing.");
+    }
     updateCounters();
 
     boolean commitRequired = isCommitRequired();
@@ -848,7 +879,7 @@ abstract public class Task implements Wr
       // say the task tracker that task is commit pending
       while (true) {
         try {
-          umbilical.commitPending(taskId, taskStatus);
+          umbilical.commitPending(taskIdForUmbilical, taskStatus);
           break;
         } catch (InterruptedException ie) {
           // ignore
@@ -899,8 +930,14 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
-          LOG.warn("Parent died.  Exiting "+taskId);
+        //GRR FIXME (later):  alternatives to taskIdForUmbilical would be
+        // (1) include taskId as part of umbilical object and protocol;
+        // (2) include taskId as part of taskStatus
+        // (3) extend TaskAttemptID (or create related Task inner class?) to
+        //     include taskAttemptId() and taskAtteptIdForUmbilical() method
+        //     that's overridden in uber context [Dick]
+        if (!umbilical.statusUpdate(taskIdForUmbilical, taskStatus)) {
+          LOG.warn("Parent died.  Exiting " + taskId);
           System.exit(66);
         }
         taskStatus.clearStatus();
@@ -936,7 +973,7 @@ abstract public class Task implements Wr
    * @return -1 if it can't be found.
    */
    private long calculateOutputSize() throws IOException {
-    if (!isMapOrReduce()) {
+    if (!isMapOrReduce() || isUberTask()) {
       return -1;
     }
 
@@ -956,8 +993,13 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        umbilical.done(getTaskID());
-        LOG.info("Task '" + taskId + "' done.");
+        umbilical.done(taskIdForUmbilical);
+        if (isUberTask()) {
+          LOG.info("UberTask '" + taskIdForUmbilical + "' subtask '" + taskId
+                   + "' done.");
+        } else {
+          LOG.info("Task '" + taskId + "' done.");
+        }
         return;
       } catch (IOException ie) {
         LOG.warn("Failure signalling completion: " + 
@@ -976,7 +1018,7 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        while (!umbilical.canCommit(taskId)) {
+        while (!umbilical.canCommit(taskIdForUmbilical)) {
           try {
             Thread.sleep(1000);
           } catch(InterruptedException ie) {
@@ -1033,7 +1075,7 @@ abstract public class Task implements Wr
     setPhase(TaskStatus.Phase.CLEANUP);
     getProgress().setStatus("cleanup");
     statusUpdate(umbilical);
-    LOG.info("Runnning cleanup for the task");
+    LOG.info("Running cleanup for the task");
     // do the cleanup
     committer.abortTask(taskContext);
   }
@@ -1058,8 +1100,10 @@ abstract public class Task implements Wr
         oldCommitter.abortJob(jobContext, jobRunStateForCleanup);
       }
     } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
-      LOG.info("Committing job");
-      committer.commitJob(jobContext);
+      // delete <outputdir>/_temporary and optionally create _SUCCESS file
+      if (!isUberTask()) {  // defer since output files have not yet been saved
+        commitJob();
+      }
     } else {
       throw new IOException("Invalid state of the job for cleanup. State found "
                             + jobRunStateForCleanup + " expecting "
@@ -1068,20 +1112,31 @@ abstract public class Task implements Wr
                             + JobStatus.State.KILLED);
     }
     
-    // delete the staging area for the job
+    // delete the staging area for the job (e.g.,
+    // "hdfs://localhost:9000/tmp/hadoop-<user>/mapred/staging/<user>/.staging/
+    // job_YYYYMMDDhhmm_nnnn"--NOT same as "_temporary" subdir of output dir)
     JobConf conf = new JobConf(jobContext.getConfiguration());
     if (!supportIsolationRunner(conf)) {
-      String jobTempDir = conf.get("mapreduce.job.dir");
-      Path jobTempDirPath = new Path(jobTempDir);
-      FileSystem fs = jobTempDirPath.getFileSystem(conf);
-      fs.delete(jobTempDirPath, true);
+      String jobStagingDir = conf.get("mapreduce.job.dir");
+      Path jobStagingDirPath = new Path(jobStagingDir);
+      FileSystem fs = jobStagingDirPath.getFileSystem(conf);
+      fs.delete(jobStagingDirPath, true);
+    }
+    // update counters, save any pending output files, shut down the progress-
+    // reporter communication thread and the umbilical, and mark the task done
+    if (!isUberTask()) {  // defer so UberTask can send TT final update(s)
+      done(umbilical, reporter);
     }
-    done(umbilical, reporter);
   }
-  
+
+  protected void commitJob() throws IOException {
+    LOG.info("Committing job");
+    committer.commitJob(jobContext);
+  }
+
   protected boolean supportIsolationRunner(JobConf conf) {
-    return (conf.getKeepTaskFilesPattern() != null || conf
-        .getKeepFailedTaskFiles());
+    return (conf.getKeepTaskFilesPattern() != null ||
+            conf.getKeepFailedTaskFiles());
   }
 
   protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
@@ -1090,9 +1145,12 @@ abstract public class Task implements Wr
     // do the setup
     getProgress().setStatus("setup");
     committer.setupJob(jobContext);
-    done(umbilical, reporter);
+    if (!isUberTask()) {
+      // UberTask calls done() directly; don't shut down umbilical prematurely
+      done(umbilical, reporter);
+    }
   }
-  
+
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
       this.conf = (JobConf) conf;

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Mar  8 05:54:13 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 import org.apache.hadoop.net.Node;
@@ -62,7 +62,7 @@ import org.apache.hadoop.net.Node;
  * **************************************************************
  */
 class TaskInProgress {
-  static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently  
+  static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently
   int maxTaskAttempts = 4;    
   static final long SPECULATIVE_LAG = 60 * 1000;
   private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
@@ -75,14 +75,19 @@ class TaskInProgress {
 
   // Defines the TIP
   private String jobFile = null;
-  private TaskSplitMetaInfo splitInfo;
+  private TaskSplitMetaInfo[] splitInfo;
   private int numMaps;
+  private int numReduces;
   private int partition;
   private JobTracker jobtracker;
   private JobHistory jobHistory;
   private TaskID id;
   private JobInProgress job;
   private final int numSlotsRequired;
+  private boolean jobCleanup = false;
+  private boolean jobSetup = false;
+  private boolean isUber = false;
+  private boolean jobSetupCleanupNeeded = false;  // UberTasks only
 
   // Status of the TIP
   private int successEventNumber = -1;
@@ -100,8 +105,6 @@ class TaskInProgress {
   private long maxSkipRecords = 0;
   private FailedRanges failedRanges = new FailedRanges();
   private volatile boolean skipping = false;
-  private boolean jobCleanup = false; 
-  private boolean jobSetup = false;
 
   private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
   private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
@@ -169,7 +172,8 @@ class TaskInProgress {
                         JobInProgress job, int partition,
                         int numSlotsRequired) {
     this.jobFile = jobFile;
-    this.splitInfo = split;
+    this.splitInfo = new TaskSplitMetaInfo[1];
+    this.splitInfo[0] = split;
     this.jobtracker = jobtracker;
     this.job = job;
     this.conf = conf;
@@ -183,7 +187,7 @@ class TaskInProgress {
     }
     this.user = job.getUser();
   }
-        
+
   /**
    * Constructor for ReduceTask
    */
@@ -265,7 +269,36 @@ class TaskInProgress {
       }
     }
   }
-  
+
+  /**
+   * Constructor for UberTask
+   */
+  public TaskInProgress(JobID jobid, String jobFile,
+                        TaskSplitMetaInfo[] splits,
+                        int numMaps, int numReduces,
+                        int partition, JobTracker jobtracker, JobConf conf,
+                        JobInProgress job, int numSlotsRequired,
+                        boolean jobSetupCleanupNeeded) {
+    this.isUber = true;
+    this.jobFile = jobFile;
+    this.splitInfo = splits;
+    this.numMaps = numMaps;
+    this.numReduces = numReduces;
+    this.jobSetupCleanupNeeded = jobSetupCleanupNeeded;
+    this.jobtracker = jobtracker;
+    this.job = job;
+    this.conf = conf;
+    this.partition = partition;
+    this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
+    this.numSlotsRequired = numSlotsRequired;
+    setMaxTaskAttempts();
+    init(jobid);
+    if (jobtracker != null) {
+      this.jobHistory = jobtracker.getJobHistory();
+    }
+    this.user = job.getUser();
+  }
+
   /**
    * Set the max number of attempts before we declare a TIP as "failed"
    */
@@ -276,15 +309,6 @@ class TaskInProgress {
       this.maxTaskAttempts = conf.getMaxReduceAttempts();
     }
   }
-    
-  /**
-   * Return the index of the tip within the job, so 
-   * "task_200707121733_1313_0002_m_012345" would return 12345;
-   * @return int the tip index
-   */
-  public int idWithinJob() {
-    return partition;
-  }    
 
   public boolean isJobCleanupTask() {
    return jobCleanup;
@@ -392,19 +416,32 @@ class TaskInProgress {
   public JobInProgress getJob() {
     return job;
   }
+
   /**
    * Return an ID for this task, not its component taskid-threads
    */
   public TaskID getTIPId() {
     return this.id;
   }
+
   /**
-   * Whether this is a map task
+   * Whether this is a map task.  Note that ubertasks return false here so
+   * they can run in a reduce slot (larger).  (Setup and cleanup tasks may
+   * return either true or false.)
    */
   public boolean isMapTask() {
-    return splitInfo != null;
+    return splitInfo != null && !isUber;
   }
-    
+
+  /**
+   * Whether this is an ubertask, i.e., a meta-task that contains a handful
+   * of map tasks and (at most) a single reduce task.  Note that ubertasks
+   * are seen as reduce tasks in most contexts.
+   */
+  public boolean isUberTask() {
+    return isUber;
+  }
+
   /**
    * Returns the {@link TaskType} of the {@link TaskAttemptID} passed. 
    * The type of an attempt is determined by the nature of the task and not its 
@@ -929,15 +966,16 @@ class TaskInProgress {
   }
 
   /**
-   * Get the split locations 
+   * Get the split locations
    */
   public String[] getSplitLocations() {
+//GRR FIXME?  may need to add "(  ..  || isUberTask())" if ever called for uber (but locations for which split?  all of them?)
     if (isMapTask() && !jobSetup && !jobCleanup) {
-      return splitInfo.getLocations();
+      return splitInfo[0].getLocations();
     }
     return new String[0];
   }
-  
+
   /**
    * Get the Status of the tasks managed by this TIP
    */
@@ -1120,7 +1158,7 @@ class TaskInProgress {
  
     //set this the first time we run a taskAttempt in this TIP
     //each Task attempt has its own TaskStatus, which tracks that
-    //attempts execStartTime, thus this startTime is TIP wide.
+    //attempt's execStartTime, thus this startTime is TIP wide.
     if (0 == execStartTime){
       setExecStartTime(lastDispatchTime);
     }
@@ -1132,8 +1170,9 @@ class TaskInProgress {
   }
   
   /**
-   * Adds a previously running task to this tip. This is used in case of 
-   * jobtracker restarts.
+   * Creates a task or recreates a previously running one and adds it to this
+   * tip. The latter is used in case of jobtracker restarts.  This is the
+   * ultimate source of Task objects in a normal Hadoop setup.
    */
   public Task addRunningTask(TaskAttemptID taskid, 
                              String taskTracker,
@@ -1147,10 +1186,20 @@ class TaskInProgress {
         LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
                   + failedRanges.getIndicesCount());
       }
-      t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
-          numSlotsNeeded);
+      t = new MapTask(jobFile, taskid, partition,
+                      splitInfo[0].getSplitIndex(), numSlotsNeeded);
     } else {
-      t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
+      if (isUberTask()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Launching actual UberTask (" + numMaps + " maps, "
+                    + numReduces + " reduces)");
+        }
+        // numMaps is implicit in size of splitIndex array:
+        t = new UberTask(jobFile, taskid, partition, getSplitIndexArray(),
+                         numReduces, numSlotsNeeded, jobSetupCleanupNeeded);
+      } else {
+        t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
+      }
     }
     if (jobCleanup) {
       t.setJobCleanupTask();
@@ -1187,6 +1236,17 @@ class TaskInProgress {
     return t;
   }
 
+  // GRR FIXME?  more efficient just to pass splitInfo directly...any need for
+  //             rest of it in UberTask?
+  TaskSplitIndex[] getSplitIndexArray() {
+    int numSplits = splitInfo.length;
+    TaskSplitIndex[] splitIndex = new TaskSplitIndex[numSplits];
+    for (int i = 0; i < numSplits; ++i) {
+      splitIndex[i] = splitInfo[i].getSplitIndex();
+    }
+    return splitIndex;
+  }
+
   boolean isRunningTask(TaskAttemptID taskid) {
     TaskStatus status = taskStatuses.get(taskid);
     return status != null && status.getRunState() == TaskStatus.State.RUNNING;
@@ -1236,7 +1296,8 @@ class TaskInProgress {
   }
     
   /**
-   * Get the id of this map or reduce task.
+   * Get the task index of this map or reduce task.  For example,
+   * "task_201011230308_87259_r_000240" would return 240.
    * @return The index of this tip in the maps/reduces lists.
    */
   public int getIdWithinJob() {
@@ -1256,7 +1317,7 @@ class TaskInProgress {
   public int getSuccessEventNumber() {
     return successEventNumber;
   }
-  
+
   /** 
    * Gets the Node list of input split locations sorted in rack order.
    */ 
@@ -1264,7 +1325,7 @@ class TaskInProgress {
     if (!isMapTask() || jobSetup || jobCleanup) {
       return "";
     }
-    String[] splits = splitInfo.getLocations();
+    String[] splits = splitInfo[0].getLocations();  // actually replicas
     Node[] nodes = new Node[splits.length];
     for (int i = 0; i < splits.length; i++) {
       nodes[i] = jobtracker.getNode(splits[i]);
@@ -1293,13 +1354,20 @@ class TaskInProgress {
   }
 
   public long getMapInputSize() {
-    if(isMapTask() && !jobSetup && !jobCleanup) {
-      return splitInfo.getInputDataLength();
+    if (isUberTask()) {
+      int numSplits = splitInfo.length;
+      long sumInputDataLength = 0;
+      for (int i = 0; i < numSplits; ++i) {
+        sumInputDataLength += splitInfo[i].getInputDataLength();
+      }
+      return sumInputDataLength;
+    } else if (isMapTask() && !jobSetup && !jobCleanup) {
+      return splitInfo[0].getInputDataLength();
     } else {
       return 0;
     }
   }
-  
+
   /**
    * Compare most recent task attempts dispatch time to current system time so
    * that task progress rate will slow down as time proceeds even if no progress

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Mar  8 05:54:13 2011
@@ -41,10 +41,15 @@ public abstract class TaskStatus impleme
   static final Log LOG =
     LogFactory.getLog(TaskStatus.class.getName());
   
-  //enumeration for reporting current phase of a task.
+  // what kind of TaskStatus is it?
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
-  public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
+  public static enum Type {MAP, REDUCE, UBER}
+
+  // enumeration for reporting current phase of a task.
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static enum Phase {STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
 
   // what state is the task in?
   @InterfaceAudience.Private
@@ -105,6 +110,7 @@ public abstract class TaskStatus impleme
   
   public TaskAttemptID getTaskID() { return taskid; }
   public abstract boolean getIsMap();
+  public abstract boolean getIsUber();
   public int getNumSlots() {
     return numSlots;
   }
@@ -495,46 +501,61 @@ public abstract class TaskStatus impleme
   //////////////////////////////////////////////////////////////////////////////
   // Factory-like methods to create/read/write appropriate TaskStatus objects
   //////////////////////////////////////////////////////////////////////////////
-  
-  static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, 
-                                     float progress, int numSlots,
-                                     State runState, String diagnosticInfo,
-                                     String stateString, String taskTracker,
-                                     Phase phase, Counters counters) 
-  throws IOException {
-    boolean isMap = in.readBoolean();
-    return createTaskStatus(isMap, taskId, progress, numSlots, runState, 
-                            diagnosticInfo, stateString, taskTracker, phase, 
-                            counters);
-  }
-  
+
+  // this is the main one used by TT.TIP, JIP, and (apparently) all the
+  // relevant tests (CapacityTestUtils, TestFairScheduler, TestClusterStatus,
+  // TestJobInProgress, TestJobTrackerInstrumentation, FakeObjectUtilities)
+  // [also formerly MapTask and ReduceTask, but no longer]
   static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, 
                                      float progress, int numSlots,
                                      State runState, String diagnosticInfo,
                                      String stateString, String taskTracker,
                                      Phase phase, Counters counters) { 
     return (isMap) ? new MapTaskStatus(taskId, progress, numSlots, runState, 
-                                       diagnosticInfo, stateString, taskTracker, 
+                                       diagnosticInfo, stateString, taskTracker,
                                        phase, counters) :
                      new ReduceTaskStatus(taskId, progress, numSlots, runState, 
                                           diagnosticInfo, stateString, 
                                           taskTracker, phase, counters);
   }
-  
-  static TaskStatus createTaskStatus(boolean isMap) {
-    return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus();
+
+  // used only in default ctors of Task (also formerly MapTask, ReduceTask) and
+  // readTaskStatus() below
+  static TaskStatus createTaskStatus(Type tsType) {
+    return (tsType == TaskStatus.Type.MAP)
+        ? new MapTaskStatus()
+        : (tsType == TaskStatus.Type.REDUCE)
+            ? new ReduceTaskStatus()
+            : new UberTaskStatus();
   }
 
   static TaskStatus readTaskStatus(DataInput in) throws IOException {
     boolean isMap = in.readBoolean();
-    TaskStatus taskStatus = createTaskStatus(isMap);
+    boolean isUber = in.readBoolean();
+    Type tsType = isMap
+        ? TaskStatus.Type.MAP
+        : isUber
+            ? TaskStatus.Type.UBER
+            : TaskStatus.Type.REDUCE;
+    TaskStatus taskStatus = createTaskStatus(tsType);
     taskStatus.readFields(in);
     return taskStatus;
   }
   
   static void writeTaskStatus(DataOutput out, TaskStatus taskStatus) 
   throws IOException {
+/* LATER
+ *  //GRR FIXME:  longer-term, just store tsType as member var (but then need
+ *  //            to modify or add new ctor:  used in many places)
+ *  Type tsType = taskStatus.getIsUber()
+ *      ? TaskStatus.Type.UBER
+ *      : taskStatus.getIsMap()
+ *          ? TaskStatus.Type.MAP
+ *          : TaskStatus.Type.REDUCE;
+ *  WritableUtils.writeEnum(out, tsType); (or enum.ordinal() [as in MR-901] or ...)
+ */
     out.writeBoolean(taskStatus.getIsMap());
+    out.writeBoolean(taskStatus.getIsUber());
     taskStatus.write(out);
   }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar  8 05:54:13 2011
@@ -827,7 +827,7 @@ public class TaskTracker 
           f = rjob.getFetchStatus();
           for (TaskInProgress tip : rjob.tasks) {
             Task task = tip.getTask();
-            if (!task.isMapTask()) {
+            if (!task.isMapTask() && !task.isUberTask()) {
               if (((ReduceTask)task).getPhase() == 
                   TaskStatus.Phase.SHUFFLE) {
                 if (rjob.getFetchStatus() == null) {
@@ -2502,21 +2502,27 @@ public class TaskTracker 
       this.lastProgressReport = System.currentTimeMillis();
       this.defaultJobConf = conf;
       localJobConf = null;
-      taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
-                                               0.0f, 
-                                               task.getNumSlotsRequired(),
-                                               task.getState(),
-                                               diagnosticInfo.toString(), 
-                                               "initializing",  
-                                               getName(), 
-                                               task.isTaskCleanupTask() ? 
-                                                 TaskStatus.Phase.CLEANUP :  
-                                               task.isMapTask()? TaskStatus.Phase.MAP:
-                                               TaskStatus.Phase.SHUFFLE,
-                                               task.getCounters()); 
+      if (task.isUberTask()) {
+        taskStatus = new UberTaskStatus(
+            task.getTaskID(), 0.0f,
+            task.getNumSlotsRequired(), task.getState(),
+            diagnosticInfo.toString(), "initializing", getName(),
+            TaskStatus.Phase.MAP, task.getCounters());
+      } else {
+        taskStatus = TaskStatus.createTaskStatus(
+            task.isMapTask(), task.getTaskID(), 0.0f,
+            task.getNumSlotsRequired(), task.getState(),
+            diagnosticInfo.toString(), "initializing", getName(),
+            task.isTaskCleanupTask()
+              ? TaskStatus.Phase.CLEANUP
+              : task.isMapTask()
+                ? TaskStatus.Phase.MAP
+                : TaskStatus.Phase.SHUFFLE,
+            task.getCounters());
+      }
       taskTimeout = (10 * 60 * 1000);
     }
-        
+
     void localizeTask(Task task) throws IOException{
 
       FileSystem localFs = FileSystem.getLocal(fConf);
@@ -2633,7 +2639,7 @@ public class TaskTracker 
         this.taskStatus.setStartTime(System.currentTimeMillis());
       } else {
         LOG.info("Not launching task: " + task.getTaskID() + 
-            " since it's state is " + this.taskStatus.getRunState());
+            " since its state is " + this.taskStatus.getRunState());
       }
     }
 
@@ -3206,7 +3212,7 @@ public class TaskTracker 
     
     LOG.debug("JVM with ID : " + jvmId + " asked for a task");
     if (!jvmManager.isJvmKnown(jvmId)) {
-      LOG.info("Killing unknown JVM " + jvmId);
+      LOG.info("Killing unknown JVM " + jvmId); //GRR FIXME:  bug?  no (apparent) killing going on here...
       return new JvmTask(null, true);
     }
     RunningJob rjob = runningJobs.get(jvmId.getJobId());
@@ -3237,7 +3243,7 @@ public class TaskTracker 
   public synchronized boolean statusUpdate(TaskAttemptID taskid, 
                                               TaskStatus taskStatus) 
   throws IOException {
-    TaskInProgress tip = tasks.get(taskid);
+    TaskInProgress tip = tasks.get(taskid);  // TT.TIP, not TaskInProgress.java
     if (tip != null) {
       tip.reportProgress(taskStatus);
       myInstrumentation.statusUpdate(tip.getTask(), taskStatus);
@@ -3685,17 +3691,15 @@ public class TaskTracker 
         }
         userName = rjob.jobConf.getUser();
       }
-      // Index file
-      Path indexFileName =
-          lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-              userName, jobId, mapId)
-              + "/file.out.index", conf);
-
-      // Map-output file
-      Path mapOutputFileName =
-          lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-              userName, jobId, mapId)
-              + "/file.out", conf);
+
+      // Map-output filename ("... /file.out")
+      StringBuilder sb = new StringBuilder();
+      sb.append(TaskTracker.getIntermediateOutputDir(userName, jobId, mapId))
+          .append("/").append(MapOutputFile.MAP_OUTPUT_FILENAME_STRING);
+      Path mapOutputFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf);
+      // Index filename ("... /file.out.index")
+      sb.append(MapOutputFile.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+      Path indexFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf);
 
       /**
        * Read the index file to get the information about where the map-output

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java?rev=1079193&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java Tue Mar  8 05:54:13 2011
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapreduce.MRJobConfig;  // JobContext.SKIP_RECORDS
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.TaskType;  // MAP, JOB_SETUP, TASK_CLEANUP...
+import org.apache.hadoop.util.Progress;
+
+class UberTask extends Task {
+  private TaskSplitIndex[] splits;
+  private int numMapTasks;
+  private int numReduceTasks;
+  private boolean jobSetupCleanupNeeded;
+
+  private static final Log LOG = LogFactory.getLog(UberTask.class.getName());
+
+  private Progress[] subPhases;  // persistent storage for MapTasks, ReduceTask
+
+  // "instance initializer":  executed between Task and UberTask constructors
+  {
+    // cannot call setPhase() here now that createTaskStatus() is called in
+    // Task subclass(es):  initializer is executed before subclass ctor =>
+    // taskStatus still null => NPE
+    getProgress().setStatus("uber"); // Task.java: change name of root Progress
+  }
+
+  public UberTask() {
+    super();
+    this.taskStatus = new UberTaskStatus();
+  }
+
+  public UberTask(String jobFile, TaskAttemptID taskId, int partition,
+                  TaskSplitIndex[] splits, int numReduceTasks,
+                  int numSlotsRequired, boolean jobSetupCleanupNeeded) {
+    super(jobFile, taskId, partition, numSlotsRequired);
+    this.splits = splits;
+    this.numMapTasks = splits.length;
+    this.numReduceTasks = numReduceTasks;
+    this.jobSetupCleanupNeeded = jobSetupCleanupNeeded;
+    this.taskStatus = new UberTaskStatus(getTaskID(), 0.0f, numSlotsRequired,
+                                         TaskStatus.State.UNASSIGNED,
+                                         "", "", "", TaskStatus.Phase.MAP,
+                                         getCounters());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("UberTask " + taskId + " constructed with " + numMapTasks
+                + " sub-maps and " + numReduceTasks + " sub-reduces");
+    }
+  }
+
+  @Override
+  public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip)
+      throws IOException {
+    return new UberTaskRunner(tip, tracker, conf);
+  }
+
+  /* perhaps someday we'll allow an UberTask to run as either a MapTask or a
+   * ReduceTask, but for now it's the latter only */
+  @Override
+  public boolean isMapTask() {
+    return false;
+  }
+
+  /**
+   * Is this really a combo-task masquerading as a plain ReduceTask?  Yup.
+   */
+  @Override
+  public boolean isUberTask() {
+    return true;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void run(JobConf job, TaskUmbilicalProtocol umbilical)
+  throws IOException, ClassNotFoundException, InterruptedException {
+    this.umbilical = umbilical;
+
+    // set up two-level Progress/phase tree:  getProgress() is root ("uber"),
+    // and subtasks' "root node" Progress is second level (will override
+    // native one when construct each subtask)
+    subPhases = new Progress[numMapTasks + numReduceTasks];
+    for (int j=0; j < numMapTasks; ++j) {
+      subPhases[j] = getProgress().addPhase("map " + String.valueOf(j+1));
+    }
+    for (int j = numMapTasks; j < numMapTasks + numReduceTasks; ++j) {
+      subPhases[j] =
+        getProgress().addPhase("reduce " + String.valueOf(j - numMapTasks + 1));
+    }
+    // we could set up each subtask's phases, too, but would need to store all
+    // (2*numMapTasks + 2*numReduceTasks) of them here, and subtasks already
+    // have storage allocated (mapPhase, sortPhase, copyPhase, reducePhase) in
+    // MapTask and ReduceTask--instead, will call new accessor for each after
+    // each subtask is created
+
+    // Start thread that will handle communication with parent.  Note that this
+    // is NOT the reporter the subtasks will use--we want them to get one that
+    // knows nothing of umbilical, so that calls to it will pass through us,
+    // changing the task ID to our own (UberTask's) before sending progress on
+    // up via this reporter.  (No need for the subtask reporter also to adjust
+    // the progress percentage; we get that for free from the phase tree.)
+    TaskReporter reporter = startReporter(umbilical);
+
+    // use context objects API?
+    boolean useNewApi = job.getUseNewMapper();   // "mapred.mapper.new-api"
+    assert useNewApi == job.getUseNewReducer();  // enforce consistency
+
+    // initialize the ubertask (sole "real" task as far as framework is
+    // concerned); this is where setupTask() is called
+    initialize(job, getJobID(), reporter, useNewApi);
+
+    // Generate the map TaskAttemptIDs we need to run.
+    // Regular tasks are handed their TaskAttemptIDs via TaskInProgress's
+    // getTaskToRun() and addRunningTask(), but that approach doesn't work
+    // for us.  Ergo, we create our own--technically bypassing the nextTaskId
+    // limits in getTaskToRun(), but since the point of UberTask is to roll
+    // up too-small jobs into a single, more normal-sized ubertask (whose
+    // single TaskAttemptID _is_ subject to the limits), that's reasonable.
+    TaskAttemptID[] mapIds = createMapIds();
+
+    // set up the job
+    if (jobSetupCleanupNeeded) {
+      runSetupJob(reporter);
+    }
+
+    // run the maps
+    if (numMapTasks > 0) {
+      runUberMapTasks(job, mapIds, splits, umbilical, reporter);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("UberTask " + getTaskID() + " has no sub-MapTasks to run");
+      }
+    }
+
+    if (numReduceTasks > 0) {
+      // may make sense to lift this restriction at some point, but for now
+      // code is written to support one at most:
+      if (numReduceTasks > 1) {
+        throw new IOException("UberTask invoked with " + numReduceTasks
+                              + " reduces (1 max)");
+      }
+
+      // set up the reduce ...
+      Class keyClass = job.getMapOutputKeyClass();
+      Class valueClass = job.getMapOutputValueClass();
+      RawComparator comparator = job.getOutputValueGroupingComparator();
+
+      // ... then run it (using our own [reduce] TaskAttemptID)
+      runUberReducer(job, getTaskID(), mapIds.length, umbilical, reporter,
+                     comparator, keyClass, valueClass);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("UberTask " + getTaskID() + " has no sub-ReduceTasks to run");
+      }
+    }
+
+    // clean up the job (switch phase to "cleanup" and delete staging dir, but
+    // do NOT delete temp dir yet)
+    if (jobSetupCleanupNeeded) {
+      runCommitAbortJob(reporter);
+    }
+
+    // this is where commitTask() (or abortTask()) is called
+    done(umbilical, reporter);
+
+    // now finish cleaning up the job (delete temp dir:  results are committed)
+    if (jobSetupCleanupNeeded) {
+      commitJob();
+    }
+  }
+
+  private TaskAttemptID[] createMapIds() {
+    TaskAttemptID[] mapIds = new TaskAttemptID[numMapTasks];
+    // Note that the reducer always starts looking for ID 0 (output/map_0.out),
+    // so it's not possible (or at least not easy) to add an offset to the
+    // mapIds.  However, since ~nobody but us ever sees them (thanks to
+    // SubTaskReporter translation below--progress is reported to TT using
+    // UberTask's ID), it's OK to overlap our own task ID and potentially
+    // those of the setup and cleanup tasks.
+    for (int j = 0; j < mapIds.length; ++j) {
+      mapIds[j] = new TaskAttemptID(new TaskID(getJobID(), TaskType.MAP, j), 0);
+    }
+    return mapIds;
+  }
+
+//GRR PERF TODO: make sure not crossing disk boundaries (optimization: just renaming map outputs to reduce inputs without doing in-memory/disk-spill shuffle thing)
+  private void renameMapOutputForReduce(TaskAttemptID mapId,
+                                        MapOutputFile subMapOutputFile)
+  throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf); //PERF FIXME? could pass in
+    // move map output to reduce input
+    Path mapOut = subMapOutputFile.getOutputFile();
+    Path reduceIn = subMapOutputFile.getInputFileForWrite(
+        mapId.getTaskID(), localFs.getLength(mapOut));
+    if (!localFs.mkdirs(reduceIn.getParent())) {
+      throw new IOException("Mkdirs failed to create "
+          + reduceIn.getParent().toString());
+    }
+    if (!localFs.rename(mapOut, reduceIn))
+      throw new IOException("Couldn't rename " + mapOut);
+  }
+
+  private void runSetupJob(TaskReporter reporter)
+  throws IOException, InterruptedException {
+    runJobSetupTask(umbilical, reporter);
+  }
+
+  private void runCommitAbortJob(TaskReporter reporter)
+  throws IOException, InterruptedException {
+    // if we (uber) got this far without _ourselves_ being whacked, then we've
+    // succeeded
+    setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+    runJobCleanupTask(umbilical, reporter);
+  }
+
+  /**
+   * This is basically an uber-specific version of MapTask's run() method.
+   * It loops over the map subtasks sequentially.  runUberReducer() (below)
+   * is the corresponding replacement for ReduceTask's run().
+   */
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runUberMapTasks(final JobConf job,
+                       final TaskAttemptID[] mapIds,
+                       final TaskSplitIndex[] splits,
+                       final TaskUmbilicalProtocol umbilical,
+                       TaskReporter reporter)
+  throws IOException, InterruptedException, ClassNotFoundException {
+    boolean useNewApi = job.getUseNewMapper();  // use context objects API?
+
+    for (int j=0; j < mapIds.length; ++j) {
+      MapTask map = new MapTask(getJobFile(), mapIds[j], j, splits[j], 1);
+      JobConf localConf = new JobConf(job);
+      map.localizeConfiguration(localConf);
+      map.setConf(localConf);
+      // for reporting purposes (to TT), use our (uber) task ID, not subtask's:
+      map.setTaskIdForUmbilical(getTaskID());
+
+      // override MapTask's "root" Progress node with our second-level one...
+      map.setProgress(subPhases[j]);
+      // ...and add two third-level Progress nodes
+      map.createPhase(TaskStatus.Phase.MAP, "map", 0.667f);
+      map.createPhase(TaskStatus.Phase.SORT, "sort", 0.333f);
+
+      TaskReporter subReporter =
+          new SubTaskReporter(map.getProgress(), reporter, j);
+      map.initialize(localConf, getJobID(), subReporter, useNewApi);
+
+      LOG.info("UberTask " + getTaskID() + " running sub-MapTask " + (j+1)
+               + "/" + numMapTasks);
+
+      if (useNewApi) {
+        MapTask.runNewMapper(map, localConf, splits[j], umbilical, subReporter);
+      } else {
+        MapTask.runOldMapper(map, localConf, splits[j], umbilical, subReporter);
+      }
+      updateCounters(map);
+
+      // Set own progress to 1.0 and move to next sibling node in Progress/phase
+      // tree.  NOTE:  this works but is slightly fragile.  Sibling doesn't
+      // yet exist, but since internal startNextPhase() call merely updates
+      // currentPhase index without "dereferencing" it, this is OK as long as
+      // no one calls phase() on parent Progress (or get()?) in interim.
+      map.getProgress().complete();
+
+      // Signal the communication thread to pass any progress on up to the TT.
+      // (This and the renameMapOutputForReduce() optimization below are the
+      // sole bits of output committer's "commitTask()" method that we actually
+      // want/need, so consider the subtask committed at this point.)
+      reporter.progress();
+
+      // every map will produce file.out (in the same dir), so rename as we go
+      if (numReduceTasks > 0) {  //GRR FIXME:  is conditionalized approach a behavior change?  do map-only jobs produce file.out or map_#.out or nothing at all?  if not renamed here, do we suddenly lose data that we used to preserve?
+        renameMapOutputForReduce(mapIds[j], map.getMapOutputFile());
+      }
+    }
+  }
+
+  /**
+   * This is basically an uber-specific version of ReduceTask's run() method.
+   * It currently supports only a single reducer (or none, in the trivial sense
+   * of not being called in that case).
+   */
+  @SuppressWarnings("unchecked")
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runUberReducer(JobConf job, TaskAttemptID reduceId, int numMaps,
+                      final TaskUmbilicalProtocol umbilical,
+                      final TaskReporter reporter,
+                      RawComparator<INKEY> comparator,
+                      Class<INKEY> keyClass,
+                      Class<INVALUE> valueClass)
+  throws IOException, InterruptedException, ClassNotFoundException {
+    boolean useNewApi = job.getUseNewReducer();  // use context objects API?
+    ReduceTask reduce = new ReduceTask(getJobFile(), reduceId, 0, numMaps, 1);
+    JobConf localConf = new JobConf(job);
+    reduce.localizeConfiguration(localConf);
+    reduce.setConf(localConf);
+    localConf.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
+
+    // override ReduceTask's "root" Progress node with our second-level one...
+    reduce.setProgress( subPhases[numMapTasks+0] );
+    // ...and add two third-level Progress nodes (SHUFFLE/"copy" is unnecessary)
+    reduce.createPhase(TaskStatus.Phase.SORT, "sort");
+    reduce.createPhase(TaskStatus.Phase.REDUCE, "reduce");
+
+    // subtaskIndex of reduce is one bigger than that of last map,
+    // i.e., (numMapTasks-1) + 1
+    TaskReporter subReporter =
+        new SubTaskReporter(reduce.getProgress(), reporter, numMapTasks);
+    reduce.initialize(localConf, getJobID(), subReporter, useNewApi);
+
+    LOG.info("UberTask " + getTaskID() + " running sub-ReduceTask 1/"
+             + numReduceTasks);
+
+    // note that this is implicitly the "isLocal" branch of ReduceTask run():
+    // we don't have a shuffle phase
+    // (=> should skip adding Phase in first place and use 50/50 split? FIXME)
+    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+    RawKeyValueIterator rIter =
+        Merger.merge(job, rfs, job.getMapOutputKeyClass(),
+                     job.getMapOutputValueClass(), null,   // no codec
+                     ReduceTask.getMapFiles(reduce, rfs, true),
+                     !conf.getKeepFailedTaskFiles(),
+                     job.getInt(JobContext.IO_SORT_FACTOR, 100),
+                     new Path(getTaskID().toString()),
+                     job.getOutputKeyComparator(),
+                     subReporter, spilledRecordsCounter,
+                     null, null);   // no writesCounter or mergePhase
+
+    // set progress = 1.0 and move _parent's_ index to next sibling phase:
+    reduce.completePhase(TaskStatus.Phase.SORT);  // "sortPhase.complete()"
+    reduce.taskStatus.setPhase(TaskStatus.Phase.REDUCE);
+
+    if (useNewApi) {
+      ReduceTask.runNewReducer(reduce, job, umbilical, subReporter,
+                               rIter, comparator, keyClass, valueClass);
+    } else {
+      ReduceTask.runOldReducer(reduce, job, umbilical, subReporter,
+                               rIter, comparator, keyClass, valueClass);
+    }
+    updateCounters(reduce);
+
+    // set own progress to 1.0 and move to [nonexistent] next sibling node in
+    // Progress/phase tree; this will cause parent node's progress (UberTask's)
+    // to be set to 1.0, too (at least, assuming all previous siblings have
+    // done so, too...Progress/phase stuff is fragile in more ways than one)
+    reduce.getProgress().complete();
+
+    // signal the communication thread to pass any progress on up to the TT
+    reporter.progress();
+  }
+
+  /**
+   * Updates uber-counters with values from completed subtasks.
+   * @param subtask  a map or reduce subtask that has just been successfully
+   *                 completed
+   */
+  private void updateCounters(Task subtask) {
+    getCounters().incrAllCounters(subtask.getCounters());
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    if (isMapOrReduce()) {  //GRR Q:  why would this ever NOT be true?
+      out.writeBoolean(jobSetupCleanupNeeded);
+      WritableUtils.writeVInt(out, numMapTasks);
+      WritableUtils.writeVInt(out, numReduceTasks);
+      for (TaskSplitIndex splitIndex : splits) {
+        splitIndex.write(out);
+      }
+      splits = null;
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    if (isMapOrReduce()) {  //GRR Q:  why would this ever NOT be true?
+      jobSetupCleanupNeeded = in.readBoolean();
+      numMapTasks = WritableUtils.readVInt(in);
+      numReduceTasks = WritableUtils.readVInt(in);
+      splits = new TaskSplitIndex[numMapTasks];
+      for (int j=0; j < numMapTasks; ++j) {
+        TaskSplitIndex splitIndex = new TaskSplitIndex();
+        splitIndex.readFields(in);
+        splits[j] = splitIndex;
+      }
+    }
+  }
+
+  /**
+   * In our superclass, the communication thread handles communication with
+   * the parent (TaskTracker) via the umbilical, but that works only if the
+   * TaskTracker is aware of the task's ID--which is true of us (UberTask)
+   * but not our map and reduce subtasks.  Ergo, intercept subtasks' progress
+   * reports and pass them on as our own, i.e., use our own uber-taskID in
+   * place of the subtasks' bogus ones.  (Leave the progress percentage alone;
+   * the phase/Progress hierarchy we set up in run() and runUber*() will take
+   * care of that.)
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  protected class SubTaskReporter extends Task.TaskReporter
+  implements Runnable, Reporter {
+
+    private Progress taskProgress;
+    private TaskReporter uberReporter;
+    private int subtaskIndex;
+    // subtaskIndex index goes from 0 -> (m-1+r), where m = number of maps and
+    // r = number of reduces.  (Latter can be either 0 or 1, and m+r >= 1.)
+
+    SubTaskReporter(Progress progress, TaskReporter reporter, int subtaskIdx) {
+      super(progress, null);
+      this.taskProgress = progress;
+      this.uberReporter = reporter;
+      this.subtaskIndex = subtaskIdx;
+    }
+
+    @Override
+    public void run() {
+      // make sure this never gets called...
+      LOG.fatal("UberTask " + getTaskID() + " SubTaskReporter run() called "
+                + "unexpectedly for subtask index " + subtaskIndex);
+      assert "uh oh:  SubTaskReporter's run() method was called!".isEmpty();
+    }
+
+    // just one (real) intercepted method
+    @Override
+    public void setProgress(float progress) {
+      // update _our_ taskProgress [no need to do uber's, too:  ultimately does
+      // get() on uber's taskProgress], but set _uberReporter's_ progress flag
+      taskProgress.phase().set(progress);
+      uberReporter.setProgressFlag();
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java?rev=1079193&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java Tue Mar  8 05:54:13 2011
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.log4j.Level;
+
+public class UberTaskRunner extends TaskRunner {
+
+  public UberTaskRunner(TaskInProgress tip, TaskTracker tracker, JobConf conf) {
+    super(tip, tracker, conf);
+  }
+
+  @Override
+  public String getChildJavaOpts(JobConf jobConf, String defaultValue) {
+    return jobConf.get(JobConf.MAPRED_UBER_TASK_JAVA_OPTS,
+                       super.getChildJavaOpts(jobConf,
+                           JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
+  }
+
+  @Override
+  public int getChildUlimit(JobConf jobConf) {
+    return jobConf.getInt(JobConf.MAPRED_UBER_TASK_ULIMIT,
+                          super.getChildUlimit(jobConf));
+  }
+
+  @Override
+  public String getChildEnv(JobConf jobConf) {
+    return jobConf.get(JobConf.MAPRED_UBER_TASK_ENV,
+                       super.getChildEnv(jobConf));
+  }
+
+  @Override
+  public Level getLogLevel(JobConf jobConf) {
+    //GRR FIXME?  what if        !=  JobConf.MAPRED_MAP_TASK_LOG_LEVEL ?
+    return Level.toLevel(jobConf.get(JobConf.MAPRED_REDUCE_TASK_LOG_LEVEL,
+                                     JobConf.DEFAULT_LOG_LEVEL.toString()));
+  }
+
+}

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskStatus.java?rev=1079193&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskStatus.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskStatus.java Tue Mar  8 05:54:13 2011
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+
+class UberTaskStatus extends TaskStatus {
+
+  private long mapFinishTime;
+  // map version of sortFinishTime is ~ irrelevant
+
+  private long shuffleFinishTime;
+  private long sortFinishTime;
+  private List<TaskAttemptID> failedFetchTasks = new ArrayList<TaskAttemptID>(1);
+
+  public UberTaskStatus() {}
+
+  public UberTaskStatus(TaskAttemptID taskid, float progress, int numSlots,
+                        State runState, String diagnosticInfo,
+                        String stateString, String taskTracker, Phase phase,
+                        Counters counters) {
+    super(taskid, progress, numSlots, runState, diagnosticInfo, stateString,
+          taskTracker, phase, counters);
+  }
+
+  @Override
+  public Object clone() {
+    UberTaskStatus myClone = (UberTaskStatus)super.clone();
+    myClone.failedFetchTasks = new ArrayList<TaskAttemptID>(failedFetchTasks);
+    return myClone;
+  }
+
+  @Override
+  public boolean getIsMap() {
+    return false;
+  }
+
+  @Override
+  public boolean getIsUber() {
+    return true;
+  }
+
+  @Override
+  void setFinishTime(long finishTime) {
+    if (mapFinishTime == 0) {
+      this.mapFinishTime = finishTime;
+    }
+    if (shuffleFinishTime == 0) {
+      this.shuffleFinishTime = finishTime;
+    }
+    if (sortFinishTime == 0){
+      this.sortFinishTime = finishTime;
+    }
+    super.setFinishTime(finishTime);
+  }
+
+  @Override
+  public long getMapFinishTime() {
+    return mapFinishTime;
+  }
+
+  @Override
+  void setMapFinishTime(long mapFinishTime) {
+    this.mapFinishTime = mapFinishTime;
+  }
+
+  @Override
+  public long getShuffleFinishTime() {
+    return shuffleFinishTime;
+  }
+
+  @Override
+  void setShuffleFinishTime(long shuffleFinishTime) {
+    if (mapFinishTime == 0) {
+      this.mapFinishTime = shuffleFinishTime;
+    }
+    this.shuffleFinishTime = shuffleFinishTime;
+  }
+
+  @Override
+  public long getSortFinishTime() {
+    return sortFinishTime;
+  }
+
+  @Override
+  void setSortFinishTime(long sortFinishTime) {
+    if (mapFinishTime == 0) {
+      this.mapFinishTime = sortFinishTime;
+    }
+    if (shuffleFinishTime == 0) {
+      this.shuffleFinishTime = sortFinishTime;
+    }
+    this.sortFinishTime = sortFinishTime;
+  }
+
+  @Override
+  public List<TaskAttemptID> getFetchFailedMaps() {
+    return failedFetchTasks;
+  }
+
+  @Override
+  public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+    failedFetchTasks.add(mapTaskId);
+  }
+
+  @Override
+  synchronized void statusUpdate(TaskStatus status) {
+    super.statusUpdate(status);
+
+    if (status.getIsMap()) {  // status came from a sub-MapTask
+      if (status.getMapFinishTime() != 0) {
+        this.mapFinishTime = status.getMapFinishTime();
+      }
+
+    } else {                  // status came from a sub-ReduceTask
+      if (status.getShuffleFinishTime() != 0) {
+        this.shuffleFinishTime = status.getShuffleFinishTime();
+      }
+
+      if (status.getSortFinishTime() != 0) {
+        sortFinishTime = status.getSortFinishTime();
+      }
+
+      List<TaskAttemptID> newFetchFailedMaps = status.getFetchFailedMaps();
+      if (failedFetchTasks == null) {
+        failedFetchTasks = newFetchFailedMaps;
+      } else if (newFetchFailedMaps != null){
+        failedFetchTasks.addAll(newFetchFailedMaps);
+      }
+    }
+  }
+
+  @Override
+  synchronized void clearStatus() {
+    super.clearStatus();
+    failedFetchTasks.clear();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    mapFinishTime = in.readLong();
+    shuffleFinishTime = in.readLong();
+    sortFinishTime = in.readLong();
+    int numFailedFetchTasks = in.readInt();
+    failedFetchTasks = new ArrayList<TaskAttemptID>(numFailedFetchTasks);
+    for (int i=0; i < numFailedFetchTasks; ++i) {
+      TaskAttemptID taskId = new TaskAttemptID();
+      taskId.readFields(in);
+      failedFetchTasks.add(taskId);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeLong(mapFinishTime);
+    out.writeLong(shuffleFinishTime);
+    out.writeLong(sortFinishTime);
+    out.writeInt(failedFetchTasks.size());
+    for (TaskAttemptID taskId : failedFetchTasks) {
+      taskId.write(out);
+    }
+  }
+
+}

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Mar  8 05:54:13 2011
@@ -271,4 +271,19 @@ public interface MRJobConfig {
     "mapreduce.job.submithostname";
   public static final String JOB_SUBMITHOSTADDR =
     "mapreduce.job.submithostaddress";
+
+  public static final String JOB_UBERTASK_ENABLE =
+    "mapreduce.job.ubertask.enable";
+  public static final String JOB_UBERTASK_MAXMAPS =
+    "mapreduce.job.ubertask.maxmaps";
+  public static final String JOB_UBERTASK_MAXREDUCES =
+    "mapreduce.job.ubertask.maxreduces";
+  public static final String JOB_UBERTASK_MAXBYTES =
+    "mapreduce.job.ubertask.maxbytes";
+  public static final String UBERTASK_JAVA_OPTS =
+    "mapreduce.ubertask.child.java.opts";  // or mapreduce.uber.java.opts?
+  public static final String UBERTASK_ULIMIT =
+    "mapreduce.ubertask.child.ulimit";     // or mapreduce.uber.ulimit?
+  public static final String UBERTASK_ENV =
+    "mapreduce.ubertask.child.env";        // or mapreduce.uber.env?
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Tue Mar  8 05:54:13 2011
@@ -64,6 +64,9 @@
           {"name": "launchTime", "type": "long"},
           {"name": "totalMaps", "type": "int"},
           {"name": "totalReduces", "type": "int"},
+          {"name": "isUber", "type": "boolean"},
+          {"name": "numUberSubMaps", "type": "int"},
+          {"name": "numUberSubReduces", "type": "int"},
           {"name": "jobStatus", "type": "string"}
       ]
      },

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java Tue Mar  8 05:54:13 2011
@@ -526,6 +526,8 @@ public class HistoryViewer {
      int totalReduces = 0; 
      int totalCleanups = 0;
      int totalSetups = 0;
+    int numUberSubMaps = 0;
+    int numUberSubReduces = 0;
      int numFailedMaps = 0; 
      int numKilledMaps = 0;
      int numFailedReduces = 0; 
@@ -544,15 +546,20 @@ public class HistoryViewer {
      long cleanupFinished = 0;
      long setupStarted = 0;
      long setupFinished = 0;
-     
+    boolean isUber = false;
+
      /** Get total maps */
      public int getTotalMaps() { return totalMaps; } 
      /** Get total reduces */
      public int getTotalReduces() { return totalReduces; } 
-     /** Get number of clean up tasks */ 
+     /** Get number of cleanup tasks */ 
      public int getTotalCleanups() { return totalCleanups; }
-     /** Get number of set up tasks */
+     /** Get number of setup tasks */
      public int getTotalSetups() { return totalSetups; }
+    /** Get number of map subtasks within UberTask */
+    public int getNumUberSubMaps() { return numUberSubMaps; }
+    /** Get number of reduce subtasks within UberTask */
+    public int getNumUberSubReduces() { return numUberSubReduces; }
      /** Get number of failed maps */
      public int getNumFailedMaps() { return numFailedMaps; }
      /** Get number of killed maps */
@@ -567,11 +574,11 @@ public class HistoryViewer {
      public int getNumFailedCleanups() { return numFailedCleanups; }
      /** Get number of killed cleanup tasks */
      public int getNumKilledCleanups() { return numKilledCleanups; }
-     /** Get number of finished set up tasks */
+     /** Get number of finished setup tasks */
      public int getNumFinishedSetups() { return numFinishedSetups; }
-     /** Get number of failed set up tasks */
+     /** Get number of failed setup tasks */
      public int getNumFailedSetups() { return numFailedSetups; }
-     /** Get number of killed set up tasks */
+     /** Get number of killed setup tasks */
      public int getNumKilledSetups() { return numKilledSetups; }
      /** Get number of maps that were started */
      public long getMapStarted() { return mapStarted; } 
@@ -589,8 +596,10 @@ public class HistoryViewer {
      public long getSetupStarted() { return setupStarted; }
      /** Get number of setup tasks that finished */
      public long getSetupFinished() { return setupFinished; }
+    /** Get job's UberTask/non-UberTask status */
+    public boolean isUber() { return isUber; }
 
-     /** Create summary information for the parsed job */
+    /** Create summary information for the parsed job */
     public SummarizedJob(JobInfo job) {
       tasks = job.getAllTasks();
 



Mime
View raw message