hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r726850 [2/4] - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/ src/core/org/apache/hadoop/util/ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/h...
Date Mon, 15 Dec 2008 22:21:35 GMT
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Mon Dec 15 14:21:32 2008
@@ -30,6 +30,8 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
@@ -37,7 +39,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -50,12 +52,13 @@
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.IFile.Writer;
-import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
@@ -72,7 +75,6 @@
 
   private BytesWritable split = new BytesWritable();
   private String splitClass;
-  private InputSplit instantiatedSplit = null;
   private final static int APPROX_HEADER_LENGTH = 150;
 
   private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
@@ -131,11 +133,6 @@
     split.readFields(in);
   }
 
-  @Override
-  InputSplit getInputSplit() throws UnsupportedOperationException {
-    return instantiatedSplit;
-  }
-
   /**
    * This class wraps the user's record reader to update the counters and progress
    * as records are read.
@@ -147,14 +144,16 @@
     private RecordReader<K,V> rawIn;
     private Counters.Counter inputByteCounter;
     private Counters.Counter inputRecordCounter;
+    private TaskReporter reporter;
     private long beforePos = -1;
     private long afterPos = -1;
     
-    TrackedRecordReader(RecordReader<K,V> raw, Counters counters) 
+    TrackedRecordReader(RecordReader<K,V> raw, TaskReporter reporter) 
       throws IOException{
       rawIn = raw;
-      inputRecordCounter = counters.findCounter(MAP_INPUT_RECORDS);
-      inputByteCounter = counters.findCounter(MAP_INPUT_BYTES);
+      inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
+      inputByteCounter = reporter.getCounter(MAP_INPUT_BYTES);
+      this.reporter = reporter;
     }
 
     public K createKey() {
@@ -181,7 +180,7 @@
      
     protected synchronized boolean moveToNext(K key, V value)
       throws IOException {
-      setProgress(getProgress());
+      reporter.setProgress(getProgress());
       beforePos = getPos();
       boolean ret = rawIn.next(key, value);
       afterPos = getPos();
@@ -193,6 +192,9 @@
     public float getProgress() throws IOException {
       return rawIn.getProgress();
     }
+    TaskReporter getTaskReporter() {
+      return reporter;
+    }
   }
 
   /**
@@ -207,11 +209,11 @@
     private Counters.Counter skipRecCounter;
     private long recIndex = -1;
     
-    SkippingRecordReader(RecordReader<K,V> raw, Counters counters, 
-        TaskUmbilicalProtocol umbilical) throws IOException{
-      super(raw,counters);
+    SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
+                         TaskReporter reporter) throws IOException{
+      super(raw, reporter);
       this.umbilical = umbilical;
-      this.skipRecCounter = counters.findCounter(Counter.MAP_SKIPPED_RECORDS);
+      this.skipRecCounter = reporter.getCounter(Counter.MAP_SKIPPED_RECORDS);
       this.toWriteSkipRecs = toWriteSkipRecs() &&  
         SkipBadRecords.getSkipOutputPath(conf)!=null;
       skipIt = getSkipRanges().skipRangeIterator();
@@ -261,44 +263,50 @@
               skipFile.getFileSystem(conf), conf, skipFile,
               (Class<K>) createKey().getClass(),
               (Class<V>) createValue().getClass(), 
-              CompressionType.BLOCK, getReporter(umbilical));
+              CompressionType.BLOCK, getTaskReporter());
       }
       skipWriter.append(key, value);
     }
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
-    throws IOException {
-
-    final Reporter reporter = getReporter(umbilical);
+    throws IOException, ClassNotFoundException, InterruptedException {
 
     // start thread that will handle communication with parent
-    startCommunicationThread(umbilical);
+    TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
+    reporter.startCommunicationThread();
+    boolean useNewApi = job.getUseNewMapper();
+    initialize(job, getJobID(), reporter, useNewApi);
 
-    initialize(job, reporter);
     // check if it is a cleanupJobTask
     if (cleanupJob) {
-      runCleanup(umbilical);
+      runCleanup(umbilical, reporter);
       return;
     }
     if (setupJob) {
-      runSetupJob(umbilical);
+      runSetupJob(umbilical, reporter);
       return;
     }
 
-    int numReduceTasks = conf.getNumReduceTasks();
-    LOG.info("numReduceTasks: " + numReduceTasks);
-    MapOutputCollector collector = null;
-    if (numReduceTasks > 0) {
-      collector = new MapOutputBuffer(umbilical, job, reporter);
-    } else { 
-      collector = new DirectMapOutputCollector(umbilical, job, reporter);
+    if (useNewApi) {
+      runNewMapper(job, split, umbilical, reporter);
+    } else {
+      runOldMapper(job, split, umbilical, reporter);
     }
+    done(umbilical, reporter);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runOldMapper(final JobConf job,
+                    final BytesWritable rawSplit,
+                    final TaskUmbilicalProtocol umbilical,
+                    TaskReporter reporter) throws IOException {
+    InputSplit inputSplit = null;
     // reinstantiate the split
     try {
-      instantiatedSplit = (InputSplit) 
+      inputSplit = (InputSplit) 
         ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
     } catch (ClassNotFoundException exp) {
       IOException wrap = new IOException("Split class " + splitClass + 
@@ -308,24 +316,28 @@
     }
     DataInputBuffer splitBuffer = new DataInputBuffer();
     splitBuffer.reset(split.getBytes(), 0, split.getLength());
-    instantiatedSplit.readFields(splitBuffer);
+    inputSplit.readFields(splitBuffer);
     
-    // if it is a file split, we can give more details
-    if (instantiatedSplit instanceof FileSplit) {
-      FileSplit fileSplit = (FileSplit) instantiatedSplit;
-      job.set("map.input.file", fileSplit.getPath().toString());
-      job.setLong("map.input.start", fileSplit.getStart());
-      job.setLong("map.input.length", fileSplit.getLength());
-    }
-      
-    RecordReader rawIn =                  // open input
-      job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
-    RecordReader in = isSkipping() ? 
-        new SkippingRecordReader(rawIn, getCounters(), umbilical) :
-        new TrackedRecordReader(rawIn, getCounters());
+    updateJobWithSplit(job, inputSplit);
+    reporter.setInputSplit(inputSplit);
+
+    RecordReader<INKEY,INVALUE> rawIn =                  // open input
+      job.getInputFormat().getRecordReader(inputSplit, job, reporter);
+    RecordReader<INKEY,INVALUE> in = isSkipping() ? 
+        new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
+        new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
     job.setBoolean("mapred.skip.on", isSkipping());
 
-    MapRunnable runner =
+
+    int numReduceTasks = conf.getNumReduceTasks();
+    LOG.info("numReduceTasks: " + numReduceTasks);
+    MapOutputCollector collector = null;
+    if (numReduceTasks > 0) {
+      collector = new MapOutputBuffer(umbilical, job, reporter);
+    } else { 
+      collector = new DirectMapOutputCollector(umbilical, job, reporter);
+    }
+    MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
       ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
 
     try {
@@ -336,7 +348,168 @@
       in.close();                               // close input
       collector.close();
     }
-    done(umbilical);
+  }
+
+  /**
+   * Update the job with details about the file split
+   * @param job the job configuration to update
+   * @param inputSplit the file split
+   */
+  private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
+    if (inputSplit instanceof FileSplit) {
+      FileSplit fileSplit = (FileSplit) inputSplit;
+      job.set("map.input.file", fileSplit.getPath().toString());
+      job.setLong("map.input.start", fileSplit.getStart());
+      job.setLong("map.input.length", fileSplit.getLength());
+    }
+  }
+
+  static class NewTrackingRecordReader<K,V> 
+    extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
+    private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
+    private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
+    
+    NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
+                            TaskReporter reporter) {
+      this.real = real;
+      this.inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
+    }
+
+    @Override
+    public void close() throws IOException {
+      real.close();
+    }
+
+    @Override
+    public K getCurrentKey() throws IOException, InterruptedException {
+      return real.getCurrentKey();
+    }
+
+    @Override
+    public V getCurrentValue() throws IOException, InterruptedException {
+      return real.getCurrentValue();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return real.getProgress();
+    }
+
+    @Override
+    public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
+                           org.apache.hadoop.mapreduce.TaskAttemptContext context
+                           ) throws IOException, InterruptedException {
+      real.initialize(split, context);
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      boolean result = real.nextKeyValue();
+      if (result) {
+        inputRecordCounter.increment(1);
+      }
+      return result;
+    }
+  }
+
+  private class NewOutputCollector<K,V>
+    extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
+    private MapOutputCollector<K,V> collector;
+
+    NewOutputCollector(JobConf job, 
+                       TaskUmbilicalProtocol umbilical,
+                       TaskReporter reporter
+                       ) throws IOException {
+      collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
+    }
+
+    @Override
+    public void write(K key, V value) throws IOException {
+      collector.collect(key, value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException {
+      collector.flush();
+      collector.close();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runNewMapper(final JobConf job,
+                    final BytesWritable rawSplit,
+                    final TaskUmbilicalProtocol umbilical,
+                    TaskReporter reporter
+                    ) throws IOException, ClassNotFoundException,
+                             InterruptedException {
+    // make a task context so we can get the classes
+    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+      new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
+    // make a mapper
+    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
+      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
+        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
+    // make the input format
+    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
+      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
+        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
+    // rebuild the input split
+    org.apache.hadoop.mapreduce.InputSplit split = null;
+    DataInputBuffer splitBuffer = new DataInputBuffer();
+    splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
+    SerializationFactory factory = new SerializationFactory(job);
+    Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>
+      deserializer = 
+        (Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>) 
+        factory.getDeserializer(job.getClassByName(splitClass));
+    deserializer.open(splitBuffer);
+    split = deserializer.deserialize(null);
+
+    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
+      new NewTrackingRecordReader<INKEY,INVALUE>
+          (inputFormat.createRecordReader(split, taskContext), reporter);
+    
+    job.setBoolean("mapred.skip.on", isSkipping());
+    org.apache.hadoop.mapreduce.RecordWriter output = null;
+    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
+         mapperContext = null;
+    try {
+      Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
+        org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
+        (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
+                     Configuration.class,
+                     org.apache.hadoop.mapreduce.TaskAttemptID.class,
+                     org.apache.hadoop.mapreduce.RecordReader.class,
+                     org.apache.hadoop.mapreduce.RecordWriter.class,
+                     org.apache.hadoop.mapreduce.OutputCommitter.class,
+                     org.apache.hadoop.mapreduce.StatusReporter.class,
+                     org.apache.hadoop.mapreduce.InputSplit.class});
+
+      // get an output object
+      if (job.getNumReduceTasks() == 0) {
+        output = outputFormat.getRecordWriter(taskContext);
+      } else {
+        output = new NewOutputCollector(job, umbilical, reporter);
+      }
+
+      mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
+                                                     input, output, committer,
+                                                     reporter, split);
+
+      input.initialize(split, mapperContext);
+      mapper.run(mapperContext);
+      input.close();
+      output.close(mapperContext);
+    } catch (NoSuchMethodException e) {
+      throw new IOException("Can't find Context constructor", e);
+    } catch (InstantiationException e) {
+      throw new IOException("Can't create Context", e);
+    } catch (InvocationTargetException e) {
+      throw new IOException("Can't invoke Context constructor", e);
+    } catch (IllegalAccessException e) {
+      throw new IOException("Can't invoke Context constructor", e);
+    }
   }
 
   interface MapOutputCollector<K, V>
@@ -353,21 +526,20 @@
  
     private RecordWriter<K, V> out = null;
 
-    private Reporter reporter = null;
+    private TaskReporter reporter = null;
 
     private final Counters.Counter mapOutputRecordCounter;
 
     @SuppressWarnings("unchecked")
     public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
-        JobConf job, Reporter reporter) throws IOException {
+        JobConf job, TaskReporter reporter) throws IOException {
       this.reporter = reporter;
       String finalName = getOutputName(getPartition());
       FileSystem fs = FileSystem.get(job);
 
       out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
 
-      Counters counters = getCounters();
-      mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
+      mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
     }
 
     public void close() throws IOException {
@@ -393,7 +565,7 @@
     private final int partitions;
     private final Partitioner<K, V> partitioner;
     private final JobConf job;
-    private final Reporter reporter;
+    private final TaskReporter reporter;
     private final Class<K> keyClass;
     private final Class<V> valClass;
     private final RawComparator<K> comparator;
@@ -454,7 +626,7 @@
 
     @SuppressWarnings("unchecked")
     public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
-                           Reporter reporter) throws IOException {
+                           TaskReporter reporter) throws IOException {
       this.job = job;
       this.reporter = reporter;
       localFs = FileSystem.getLocal(job);
@@ -504,11 +676,10 @@
       valSerializer = serializationFactory.getSerializer(valClass);
       valSerializer.open(bb);
       // counters
-      Counters counters = getCounters();
-      mapOutputByteCounter = counters.findCounter(MAP_OUTPUT_BYTES);
-      mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
-      combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
-      combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
+      mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
+      mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
+      combineInputCounter = reporter.getCounter(COMBINE_INPUT_RECORDS);
+      combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS);
       // compression
       if (job.getCompressMapOutput()) {
         Class<? extends CompressionCodec> codecClass =

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Mapper.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Mapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Mapper.java Mon Dec 15 14:21:32 2008
@@ -127,7 +127,9 @@
  * @see MapReduceBase
  * @see MapRunnable
  * @see SequenceFile
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.Mapper} instead.
  */
+@Deprecated
 public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
   
   /** 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java Mon Dec 15 14:21:32 2008
@@ -53,9 +53,11 @@
  * @see FileOutputCommitter 
  * @see JobContext
  * @see TaskAttemptContext 
- *
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.OutputCommitter} instead.
  */
-public abstract class OutputCommitter {
+@Deprecated
+public abstract class OutputCommitter 
+                extends org.apache.hadoop.mapreduce.OutputCommitter {
   /**
    * For the framework to setup the job output during initialization
    * 
@@ -110,4 +112,74 @@
    */
   public abstract void abortTask(TaskAttemptContext taskContext)
   throws IOException;
+
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
+                             ) throws IOException {
+    setupJob((JobContext) jobContext);
+  }
+
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
+                               ) throws IOException {
+    cleanupJob((JobContext) context);
+  }
+
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final 
+  void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
+                 ) throws IOException {
+    setupTask((TaskAttemptContext) taskContext);
+  }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final boolean 
+    needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
+                    ) throws IOException {
+    return needsTaskCommit((TaskAttemptContext) taskContext);
+  }
+
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final 
+  void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
+                  ) throws IOException {
+    commitTask((TaskAttemptContext) taskContext);
+  }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final 
+  void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
+                 ) throws IOException {
+    abortTask((TaskAttemptContext) taskContext);
+  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputFormat.java Mon Dec 15 14:21:32 2008
@@ -42,7 +42,9 @@
  * 
  * @see RecordWriter
  * @see JobConf
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.OutputFormat} instead.
  */
+@Deprecated
 public interface OutputFormat<K, V> {
 
   /** 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Partitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Partitioner.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Partitioner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Partitioner.java Mon Dec 15 14:21:32 2008
@@ -29,7 +29,9 @@
  * record) is sent for reduction.</p>
  * 
  * @see Reducer
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.Partitioner} instead.
  */
+@Deprecated
 public interface Partitioner<K2, V2> extends JobConfigurable {
   
   /** 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RawKeyValueIterator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RawKeyValueIterator.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RawKeyValueIterator.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RawKeyValueIterator.java Mon Dec 15 14:21:32 2008
@@ -26,7 +26,7 @@
  * <code>RawKeyValueIterator</code> is an iterator used to iterate over
  * the raw keys and values during sort/merge of intermediate data. 
  */
-interface RawKeyValueIterator {
+public interface RawKeyValueIterator {
   /** 
    * Gets the current raw key.
    * 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Mon Dec 15 14:21:32 2008
@@ -25,6 +25,8 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.Math;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -73,6 +75,7 @@
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -258,21 +261,23 @@
      private SequenceFile.Writer skipWriter;
      private boolean toWriteSkipRecs;
      private boolean hasNext;
+     private TaskReporter reporter;
      
      public SkippingReduceValuesIterator(RawKeyValueIterator in,
          RawComparator<KEY> comparator, Class<KEY> keyClass,
-         Class<VALUE> valClass, Configuration conf, Progressable reporter,
+         Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
          TaskUmbilicalProtocol umbilical) throws IOException {
        super(in, comparator, keyClass, valClass, conf, reporter);
        this.umbilical = umbilical;
        this.skipGroupCounter = 
-         getCounters().findCounter(Counter.REDUCE_SKIPPED_GROUPS);
+         reporter.getCounter(Counter.REDUCE_SKIPPED_GROUPS);
        this.skipRecCounter = 
-         getCounters().findCounter(Counter.REDUCE_SKIPPED_RECORDS);
+         reporter.getCounter(Counter.REDUCE_SKIPPED_RECORDS);
        this.toWriteSkipRecs = toWriteSkipRecs() &&  
          SkipBadRecords.getSkipOutputPath(conf)!=null;
        this.keyClass = keyClass;
        this.valClass = valClass;
+       this.reporter = reporter;
        skipIt = getSkipRanges().skipRangeIterator();
        mayBeSkip();
      }
@@ -326,7 +331,7 @@
          skipWriter = SequenceFile.createWriter(
                skipFile.getFileSystem(conf), conf, skipFile,
                keyClass, valClass, 
-               CompressionType.BLOCK, getReporter(umbilical));
+               CompressionType.BLOCK, reporter);
        }
        skipWriter.append(key, value);
      }
@@ -335,9 +340,8 @@
   @Override
   @SuppressWarnings("unchecked")
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
-    throws IOException {
+    throws IOException, InterruptedException, ClassNotFoundException {
     job.setBoolean("mapred.skip.on", isSkipping());
-    Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
 
     if (!cleanupJob && !setupJob) {
       copyPhase = getProgress().addPhase("copy");
@@ -345,17 +349,18 @@
       reducePhase = getProgress().addPhase("reduce");
     }
     // start thread that will handle communication with parent
-    startCommunicationThread(umbilical);
-    final Reporter reporter = getReporter(umbilical);
-    initialize(job, reporter);
+    TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
+    reporter.startCommunicationThread();
+    boolean useNewApi = job.getUseNewReducer();
+    initialize(job, getJobID(), reporter, useNewApi);
 
     // check if it is a cleanupJobTask
     if (cleanupJob) {
-      runCleanup(umbilical);
+      runCleanup(umbilical, reporter);
       return;
     }
     if (setupJob) {
-      runSetupJob(umbilical);
+      runSetupJob(umbilical, reporter);
       return;
     }
     
@@ -364,7 +369,7 @@
 
     boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
     if (!isLocal) {
-      reduceCopier = new ReduceCopier(umbilical, job);
+      reduceCopier = new ReduceCopier(umbilical, job, reporter);
       if (!reduceCopier.fetchOutputs()) {
         if(reduceCopier.mergeThrowable instanceof FSError) {
           LOG.error("Task: " + getTaskID() + " - FSError: " + 
@@ -394,17 +399,42 @@
     
     sortPhase.complete();                         // sort is complete
     setPhase(TaskStatus.Phase.REDUCE); 
+    Class keyClass = job.getMapOutputKeyClass();
+    Class valueClass = job.getMapOutputValueClass();
+    RawComparator comparator = job.getOutputValueGroupingComparator();
+
+    if (useNewApi) {
+      runNewReducer(job, umbilical, reporter, rIter, comparator, 
+                    keyClass, valueClass);
+    } else {
+      runOldReducer(job, umbilical, reporter, rIter, comparator, 
+                    keyClass, valueClass);
+    }
+    done(umbilical, reporter);
+  }
 
+  @SuppressWarnings("unchecked")
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runOldReducer(JobConf job,
+                     TaskUmbilicalProtocol umbilical,
+                     final TaskReporter reporter,
+                     RawKeyValueIterator rIter,
+                     RawComparator<INKEY> comparator,
+                     Class<INKEY> keyClass,
+                     Class<INVALUE> valueClass) throws IOException {
+    Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = 
+      ReflectionUtils.newInstance(job.getReducerClass(), job);
     // make output collector
     String finalName = getOutputName(getPartition());
 
     FileSystem fs = FileSystem.get(job);
 
-    final RecordWriter out = 
+    final RecordWriter<OUTKEY,OUTVALUE> out = 
       job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);  
     
-    OutputCollector collector = new OutputCollector() {
-        public void collect(Object key, Object value)
+    OutputCollector<OUTKEY,OUTVALUE> collector = 
+      new OutputCollector<OUTKEY,OUTVALUE>() {
+        public void collect(OUTKEY key, OUTVALUE value)
           throws IOException {
           out.write(key, value);
           reduceOutputCounter.increment(1);
@@ -415,18 +445,16 @@
     
     // apply reduce function
     try {
-      Class keyClass = job.getMapOutputKeyClass();
-      Class valClass = job.getMapOutputValueClass();
       //increment processed counter only if skipping feature is enabled
       boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
         SkipBadRecords.getAutoIncrReducerProcCount(job);
       
-      ReduceValuesIterator values = isSkipping() ? 
-          new SkippingReduceValuesIterator(rIter, 
-              job.getOutputValueGroupingComparator(), keyClass, valClass, 
+      ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ? 
+          new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter, 
+              comparator, keyClass, valueClass, 
               job, reporter, umbilical) :
-          new ReduceValuesIterator(rIter, 
-          job.getOutputValueGroupingComparator(), keyClass, valClass, 
+          new ReduceValuesIterator<INKEY,INVALUE>(rIter, 
+          job.getOutputValueGroupingComparator(), keyClass, valueClass, 
           job, reporter);
       values.informReduceProgress();
       while (values.more()) {
@@ -455,13 +483,94 @@
       
       throw ioe;
     }
-    done(umbilical);
+  }
+
+  static class NewTrackingRecordWriter<K,V> 
+      extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
+    private final org.apache.hadoop.mapreduce.RecordWriter<K,V> real;
+    private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
+  
+    NewTrackingRecordWriter(org.apache.hadoop.mapreduce.RecordWriter<K,V> real,
+                            org.apache.hadoop.mapreduce.Counter recordCounter) {
+      this.real = real;
+      this.outputRecordCounter = recordCounter;
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+    InterruptedException {
+      real.close(context);
+    }
+
+    @Override
+    public void write(K key, V value) throws IOException, InterruptedException {
+      real.write(key,value);
+      outputRecordCounter.increment(1);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void runNewReducer(JobConf job,
+                     final TaskUmbilicalProtocol umbilical,
+                     final Reporter reporter,
+                     RawKeyValueIterator rIter,
+                     RawComparator<INKEY> comparator,
+                     Class<INKEY> keyClass,
+                     Class<INVALUE> valueClass
+                     ) throws IOException,InterruptedException, 
+                              ClassNotFoundException {
+    // make a task context so we can get the classes
+    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+      new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
+    // make a reducer
+    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
+      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
+        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
+    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
+      (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
+        outputFormat.getRecordWriter(taskContext);
+    job.setBoolean("mapred.skip.on", isSkipping());
+    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
+         reducerContext = null;
+    try {
+      Constructor<org.apache.hadoop.mapreduce.Reducer.Context> contextConstructor =
+        org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor
+        (new Class[]{org.apache.hadoop.mapreduce.Reducer.class,
+            Configuration.class,
+            org.apache.hadoop.mapreduce.TaskAttemptID.class,
+            RawKeyValueIterator.class,
+            org.apache.hadoop.mapreduce.RecordWriter.class,
+            org.apache.hadoop.mapreduce.OutputCommitter.class,
+            org.apache.hadoop.mapreduce.StatusReporter.class,
+            RawComparator.class,
+            Class.class,
+            Class.class});
+
+      reducerContext = contextConstructor.newInstance(reducer, job, 
+                                                      getTaskID(),
+                                                      rIter, output, committer,
+                                                      reporter, comparator, 
+                                                      keyClass, valueClass);
+
+      reducer.run(reducerContext);
+      output.close(reducerContext);
+    } catch (NoSuchMethodException e) {
+      throw new IOException("Can't find Context constructor", e);
+    } catch (InstantiationException e) {
+      throw new IOException("Can't create Context", e);
+    } catch (InvocationTargetException e) {
+      throw new IOException("Can't invoke Context constructor", e);
+    } catch (IllegalAccessException e) {
+      throw new IOException("Can't invoke Context constructor", e);
+    }
   }
 
   class ReduceCopier<K, V> implements MRConstants {
 
     /** Reference to the umbilical object */
     private TaskUmbilicalProtocol umbilical;
+    private final TaskReporter reporter;
     
     /** Reference to the task object */
     
@@ -1560,10 +1669,11 @@
       conf.setClassLoader(loader);
     }
     
-    public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf)
-      throws IOException {
+    public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
+                        TaskReporter reporter)throws IOException {
       
       configureClasspath(conf);
+      this.reporter = reporter;
       this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
       this.umbilical = umbilical;      
       this.reduceTask = ReduceTask.this;
@@ -1650,8 +1760,6 @@
       
       copiers = new ArrayList<MapOutputCopier>(numCopiers);
       
-      Reporter reporter = getReporter(umbilical);
-
       // start all the copying threads
       for (int i=0; i < numCopiers; i++) {
         MapOutputCopier copier = new MapOutputCopier(conf, reporter);
@@ -2272,7 +2380,6 @@
                          codec, null);
             RawKeyValueIterator iter  = null;
             Path tmpDir = new Path(reduceTask.getTaskID().toString());
-            final Reporter reporter = getReporter(umbilical);
             try {
               iter = Merger.merge(conf, rfs,
                                   conf.getMapOutputKeyClass(),
@@ -2312,7 +2419,7 @@
     }
 
     private class InMemFSMergeThread extends Thread {
-     
+      
       public InMemFSMergeThread() {
         setName("Thread for merging in memory files");
         setDaemon(true);
@@ -2367,7 +2474,6 @@
                      codec, null);
 
         RawKeyValueIterator rIter = null;
-        final Reporter reporter = getReporter(umbilical);
         try {
           LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
                    " segments...");

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reducer.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reducer.java Mon Dec 15 14:21:32 2008
@@ -160,7 +160,9 @@
  * @see Partitioner
  * @see Reporter
  * @see MapReduceBase
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.Reducer} instead.
  */
+@Deprecated
 public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
   
   /** 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java Mon Dec 15 14:21:32 2008
@@ -47,10 +47,13 @@
       }
       public void progress() {
       }
+      public Counter getCounter(Enum<?> name) {
+        return null;
+      }
       public Counter getCounter(String group, String name) {
         return null;
       }
-      public void incrCounter(Enum key, long amount) {
+      public void incrCounter(Enum<?> key, long amount) {
       }
       public void incrCounter(String group, String counter, long amount) {
       }
@@ -69,6 +72,14 @@
   /**
    * Get the {@link Counter} of the given group with the given name.
    * 
+   * @param name counter name
+   * @return the <code>Counter</code> of the given group/name.
+   */
+  public abstract Counter getCounter(Enum<?> name);
+
+  /**
+   * Get the {@link Counter} of the given group with the given name.
+   * 
    * @param group counter group
    * @param name counter name
    * @return the <code>Counter</code> of the given group/name.
@@ -84,7 +95,7 @@
    * @param amount A non-negative amount by which the counter is to 
    *               be incremented.
    */
-  public abstract void incrCounter(Enum key, long amount);
+  public abstract void incrCounter(Enum<?> key, long amount);
   
   /**
    * Increments the counter identified by the group and counter name

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java Mon Dec 15 14:21:32 2008
@@ -181,4 +181,12 @@
    * @throws IOException
    */
   public Counters getCounters() throws IOException;
+  
+  /**
+   * Gets the diagnostic messages for a given task attempt.
+   * @param taskid
+   * @return the list of diagnostic messages for the task
+   * @throws IOException
+   */
+  public String[] getTaskDiagnostics(TaskAttemptID taskid) throws IOException;
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileInputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileInputFormat.java Mon Dec 15 14:21:32 2008
@@ -27,7 +27,12 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.MapFile;
 
-/** An {@link InputFormat} for {@link SequenceFile}s. */
+/** An {@link InputFormat} for {@link SequenceFile}s. 
+ * @deprecated Use 
+ *  {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat} 
+ *  instead.
+ */
+@Deprecated
 public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> {
 
   public SequenceFileInputFormat() {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Mon Dec 15 14:21:32 2008
@@ -32,7 +32,12 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.*;
 
-/** An {@link OutputFormat} that writes {@link SequenceFile}s. */
+/** An {@link OutputFormat} that writes {@link SequenceFile}s. 
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat} 
+ *   instead.
+ */
+@Deprecated
 public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
 
   public RecordWriter<K, V> getRecordWriter(

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Mon Dec 15 14:21:32 2008
@@ -22,10 +22,8 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.text.NumberFormat;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -109,12 +107,11 @@
   ////////////////////////////////////////////
 
   private String jobFile;                         // job configuration file
-  private final TaskAttemptID taskId;             // unique, includes job id
+  private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
   protected boolean cleanupJob = false;
   protected boolean setupJob = false;
-  private Thread pingProgressThread;
   
   //skip ranges based on failed ranges from previous attempts
   private SortedRanges skipRanges = new SortedRanges();
@@ -132,6 +129,8 @@
   private final static int MAX_RETRIES = 10;
   protected JobContext jobContext;
   protected TaskAttemptContext taskContext;
+  protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
+  protected org.apache.hadoop.mapreduce.OutputCommitter committer;
   private volatile boolean commitPending = false;
   protected final Counters.Counter spilledRecordsCounter;
 
@@ -168,7 +167,7 @@
   public void setJobFile(String jobFile) { this.jobFile = jobFile; }
   public String getJobFile() { return jobFile; }
   public TaskAttemptID getTaskID() { return taskId; }
-  public Counters getCounters() { return counters; }
+  Counters getCounters() { return counters; }
   
   /**
    * Get the job name for this task.
@@ -271,7 +270,7 @@
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
-    taskId.readFields(in);
+    taskId = TaskAttemptID.read(in);
     partition = in.readInt();
     taskStatus.readFields(in);
     this.mapOutputFile.setJobId(taskId.getJobID()); 
@@ -315,7 +314,7 @@
    * @param umbilical for progress reports
    */
   public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical)
-    throws IOException;
+    throws IOException, ClassNotFoundException, InterruptedException;
 
 
   /** Return an approprate thread runner for this task. 
@@ -330,160 +329,194 @@
 
   // Current counters
   private transient Counters counters = new Counters();
-  
-  /**
-   * flag that indicates whether progress update needs to be sent to parent.
-   * If true, it has been set. If false, it has been reset. 
-   * Using AtomicBoolean since we need an atomic read & reset method. 
-   */  
-  private AtomicBoolean progressFlag = new AtomicBoolean(false);
+
   /* flag to track whether task is done */
   private AtomicBoolean taskDone = new AtomicBoolean(false);
-  // getters and setters for flag
-  private void setProgressFlag() {
-    progressFlag.set(true);
-  }
-  private boolean resetProgressFlag() {
-    return progressFlag.getAndSet(false);
-  }
   
   public abstract boolean isMapTask();
 
   public Progress getProgress() { return taskProgress; }
 
-  InputSplit getInputSplit() throws UnsupportedOperationException {
-    throw new UnsupportedOperationException("Input only available on map");
-  }
-
-  /** 
-   * The communication thread handles communication with the parent (Task Tracker). 
-   * It sends progress updates if progress has been made or if the task needs to 
-   * let the parent know that it's alive. It also pings the parent to see if it's alive. 
-   */
-  protected void startCommunicationThread(final TaskUmbilicalProtocol umbilical) {
-    pingProgressThread = new Thread(new Runnable() {
-        public void run() {
-          final int MAX_RETRIES = 3;
-          int remainingRetries = MAX_RETRIES;
-          // get current flag value and reset it as well
-          boolean sendProgress = resetProgressFlag();
-          while (!taskDone.get()) {
-            try {
-              boolean taskFound = true; // whether TT knows about this task
-              // sleep for a bit
-              try {
-                Thread.sleep(PROGRESS_INTERVAL);
-              } 
-              catch (InterruptedException e) {
-                LOG.debug(getTaskID() + " Progress/ping thread exiting " +
-                                        "since it got interrupted");
-                break;
-              }
-              
-              if (sendProgress) {
-                // we need to send progress update
-                updateCounters();
-                if (commitPending) {
-                  taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
-                                          taskProgress.get(),
-                                          taskProgress.toString(), 
-                                          counters);
-                } else {
-                  taskStatus.statusUpdate(TaskStatus.State.RUNNING,
-                                          taskProgress.get(),
-                                          taskProgress.toString(), 
-                                          counters);
-                }
-                taskFound = umbilical.statusUpdate(taskId, taskStatus);
-                taskStatus.clearStatus();
-              }
-              else {
-                // send ping 
-                taskFound = umbilical.ping(taskId);
-              }
-              
-              // if Task Tracker is not aware of our task ID (probably because it died and 
-              // came back up), kill ourselves
-              if (!taskFound) {
-                LOG.warn("Parent died.  Exiting "+taskId);
-                System.exit(66);
-              }
-              
-              sendProgress = resetProgressFlag(); 
-              remainingRetries = MAX_RETRIES;
-            } 
-            catch (Throwable t) {
-              LOG.info("Communication exception: " + StringUtils.stringifyException(t));
-              remainingRetries -=1;
-              if (remainingRetries == 0) {
-                ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
-                LOG.warn("Last retry, killing "+taskId);
-                System.exit(65);
-              }
-            }
-          }
-        }
-      }, "Comm thread for "+taskId);
-    pingProgressThread.setDaemon(true);
-    pingProgressThread.start();
-    LOG.debug(getTaskID() + " Progress/ping thread started");
-  }
-
-  public void initialize(JobConf job, Reporter reporter) 
-  throws IOException {
-    jobContext = new JobContext(job, reporter);
+  public void initialize(JobConf job, JobID id, 
+                         Reporter reporter,
+                         boolean useNewApi) throws IOException, 
+                                                   ClassNotFoundException,
+                                                   InterruptedException {
+    jobContext = new JobContext(job, id, reporter);
     taskContext = new TaskAttemptContext(job, taskId, reporter);
-    OutputCommitter committer = conf.getOutputCommitter();
+    if (useNewApi) {
+      LOG.debug("using new api for output committer");
+      outputFormat =
+        ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
+      committer = outputFormat.getOutputCommitter(taskContext);
+    } else {
+      committer = conf.getOutputCommitter();
+    }
     committer.setupTask(taskContext);
   }
   
-  protected Reporter getReporter(final TaskUmbilicalProtocol umbilical) 
-    throws IOException 
-  {
-    return new Reporter() {
-        public void setStatus(String status) {
-          taskProgress.setStatus(status);
-          // indicate that progress update needs to be sent
-          setProgressFlag();
-        }
-        public void progress() {
-          // indicate that progress update needs to be sent
-          setProgressFlag();
-        }
-        public Counters.Counter getCounter(String group, String name) {
-          Counters.Counter counter = null;
-          if (counters != null) {
-            counter = counters.findCounter(group, name);
+  protected class TaskReporter 
+      extends org.apache.hadoop.mapreduce.StatusReporter
+      implements Runnable, Reporter {
+    private TaskUmbilicalProtocol umbilical;
+    private InputSplit split = null;
+    private Progress taskProgress;
+    private Thread pingThread = null;
+    /**
+     * flag that indicates whether progress update needs to be sent to parent.
+     * If true, it has been set. If false, it has been reset. 
+     * Using AtomicBoolean since we need an atomic read & reset method. 
+     */  
+    private AtomicBoolean progressFlag = new AtomicBoolean(false);
+    
+    TaskReporter(Progress taskProgress,
+                 TaskUmbilicalProtocol umbilical) {
+      this.umbilical = umbilical;
+      this.taskProgress = taskProgress;
+    }
+    // getters and setters for flag
+    void setProgressFlag() {
+      progressFlag.set(true);
+    }
+    boolean resetProgressFlag() {
+      return progressFlag.getAndSet(false);
+    }
+    public void setStatus(String status) {
+      taskProgress.setStatus(status);
+      // indicate that progress update needs to be sent
+      setProgressFlag();
+    }
+    public void setProgress(float progress) {
+      taskProgress.set(progress);
+      // indicate that progress update needs to be sent
+      setProgressFlag();
+    }
+    public void progress() {
+      // indicate that progress update needs to be sent
+      setProgressFlag();
+    }
+    public Counters.Counter getCounter(String group, String name) {
+      Counters.Counter counter = null;
+      if (counters != null) {
+        counter = counters.findCounter(group, name);
+      }
+      return counter;
+    }
+    public Counters.Counter getCounter(Enum<?> name) {
+      return counters == null ? null : counters.findCounter(name);
+    }
+    public void incrCounter(Enum key, long amount) {
+      if (counters != null) {
+        counters.incrCounter(key, amount);
+      }
+      setProgressFlag();
+    }
+    public void incrCounter(String group, String counter, long amount) {
+      if (counters != null) {
+        counters.incrCounter(group, counter, amount);
+      }
+      if(skipping && SkipBadRecords.COUNTER_GROUP.equals(group) && (
+          SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS.equals(counter) ||
+          SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS.equals(counter))) {
+        //if application reports the processed records, move the 
+        //currentRecStartIndex to the next.
+        //currentRecStartIndex is the start index which has not yet been 
+        //finished and is still in task's stomach.
+        for(int i=0;i<amount;i++) {
+          currentRecStartIndex = currentRecIndexIterator.next();
+        }
+      }
+      setProgressFlag();
+    }
+    public void setInputSplit(InputSplit split) {
+      this.split = split;
+    }
+    public InputSplit getInputSplit() throws UnsupportedOperationException {
+      if (split == null) {
+        throw new UnsupportedOperationException("Input only available on map");
+      } else {
+        return split;
+      }
+    }    
+    /** 
+     * The communication thread handles communication with the parent (Task Tracker). 
+     * It sends progress updates if progress has been made or if the task needs to 
+     * let the parent know that it's alive. It also pings the parent to see if it's alive. 
+     */
+    public void run() {
+      final int MAX_RETRIES = 3;
+      int remainingRetries = MAX_RETRIES;
+      // get current flag value and reset it as well
+      boolean sendProgress = resetProgressFlag();
+      while (!taskDone.get()) {
+        try {
+          boolean taskFound = true; // whether TT knows about this task
+          // sleep for a bit
+          try {
+            Thread.sleep(PROGRESS_INTERVAL);
+          } 
+          catch (InterruptedException e) {
+            LOG.debug(getTaskID() + " Progress/ping thread exiting " +
+            "since it got interrupted");
+            break;
           }
-          return counter;
-        }
-        public void incrCounter(Enum key, long amount) {
-          if (counters != null) {
-            counters.incrCounter(key, amount);
+
+          if (sendProgress) {
+            // we need to send progress update
+            updateCounters();
+            if (commitPending) {
+              taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
+                                      taskProgress.get(),
+                                      taskProgress.toString(), 
+                                      counters);
+            } else {
+              taskStatus.statusUpdate(TaskStatus.State.RUNNING,
+                                      taskProgress.get(),
+                                      taskProgress.toString(), 
+                                      counters);
+            }
+            taskFound = umbilical.statusUpdate(taskId, taskStatus);
+            taskStatus.clearStatus();
           }
-          setProgressFlag();
-        }
-        public void incrCounter(String group, String counter, long amount) {
-          if (counters != null) {
-            counters.incrCounter(group, counter, amount);
+          else {
+            // send ping 
+            taskFound = umbilical.ping(taskId);
           }
-          if(skipping && SkipBadRecords.COUNTER_GROUP.equals(group) && (
-              SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS.equals(counter) ||
-              SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS.equals(counter))) {
-            //if application reports the processed records, move the 
-            //currentRecStartIndex to the next.
-            //currentRecStartIndex is the start index which has not yet been 
-            //finished and is still in task's stomach.
-            for(int i=0;i<amount;i++) {
-              currentRecStartIndex = currentRecIndexIterator.next();
-            }
+
+          // if Task Tracker is not aware of our task ID (probably because it died and 
+          // came back up), kill ourselves
+          if (!taskFound) {
+            LOG.warn("Parent died.  Exiting "+taskId);
+            System.exit(66);
+          }
+
+          sendProgress = resetProgressFlag(); 
+          remainingRetries = MAX_RETRIES;
+        } 
+        catch (Throwable t) {
+          LOG.info("Communication exception: " + StringUtils.stringifyException(t));
+          remainingRetries -=1;
+          if (remainingRetries == 0) {
+            ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
+            LOG.warn("Last retry, killing "+taskId);
+            System.exit(65);
           }
-          setProgressFlag();
-        }
-        public InputSplit getInputSplit() throws UnsupportedOperationException {
-          return Task.this.getInputSplit();
         }
-      };
+      }
+    }
+    public void startCommunicationThread() {
+      if (pingThread == null) {
+        pingThread = new Thread(this, "communication thread");
+        pingThread.setDaemon(true);
+        pingThread.start();
+      }
+    }
+    public void stopCommunicationThread() throws InterruptedException {
+      if (pingThread != null) {
+        pingThread.interrupt();
+        pingThread.join();
+      }
+    }
   }
   
   /**
@@ -505,12 +538,6 @@
     umbilical.reportNextRecordRange(taskId, range);
   }
 
-  public void setProgress(float progress) {
-    taskProgress.set(progress);
-    // indicate that progress update needs to be sent
-    setProgressFlag();
-  }
-
   /**
    * An updater that tracks the last number reported for a given file
    * system and only creates the counters when they are needed.
@@ -569,14 +596,15 @@
     }
   }
 
-  public void done(TaskUmbilicalProtocol umbilical) throws IOException {
+  public void done(TaskUmbilicalProtocol umbilical,
+                   TaskReporter reporter
+                   ) throws IOException, InterruptedException {
     LOG.info("Task:" + taskId + " is done."
              + " And is in the process of commiting");
     updateCounters();
 
-    OutputCommitter outputCommitter = conf.getOutputCommitter();
     // check whether the commit is required.
-    boolean commitRequired = outputCommitter.needsTaskCommit(taskContext);
+    boolean commitRequired = committer.needsTaskCommit(taskContext);
     if (commitRequired) {
       int retries = MAX_RETRIES;
       taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
@@ -597,13 +625,10 @@
         }
       }
       //wait for commit approval and commit
-      commit(umbilical, outputCommitter);
+      commit(umbilical, reporter, committer);
     }
     taskDone.set(true);
-    pingProgressThread.interrupt();
-    try {
-      pingProgressThread.join();
-    } catch (InterruptedException ie) {}
+    reporter.stopCommunicationThread();
     sendLastUpdate(umbilical);
     //signal the tasktracker that we are done
     sendDone(umbilical);
@@ -666,7 +691,9 @@
   }
 
   private void commit(TaskUmbilicalProtocol umbilical,
-                      OutputCommitter committer) throws IOException {
+                      TaskReporter reporter,
+                      org.apache.hadoop.mapreduce.OutputCommitter committer
+                      ) throws IOException {
     int retries = MAX_RETRIES;
     while (true) {
       try {
@@ -676,7 +703,7 @@
           } catch(InterruptedException ie) {
             //ignore
           }
-          setProgressFlag();
+          reporter.setProgressFlag();
         }
         // task can Commit now  
         try {
@@ -686,7 +713,7 @@
         } catch (IOException iee) {
           LOG.warn("Failure committing: " + 
                     StringUtils.stringifyException(iee));
-          discardOutput(taskContext, committer);
+          discardOutput(taskContext);
           throw iee;
         }
       } catch (IOException ie) {
@@ -694,15 +721,15 @@
             StringUtils.stringifyException(ie));
         if (--retries == 0) {
           //if it couldn't commit a successfully then delete the output
-          discardOutput(taskContext, committer);
+          discardOutput(taskContext);
           System.exit(68);
         }
       }
     }
   }
 
-  private void discardOutput(TaskAttemptContext taskContext,
-                             OutputCommitter committer) {
+  private 
+  void discardOutput(TaskAttemptContext taskContext) {
     try {
       committer.abortTask(taskContext);
     } catch (IOException ioe)  {
@@ -711,22 +738,24 @@
     }
   }
 
-  protected void runCleanup(TaskUmbilicalProtocol umbilical) 
-  throws IOException {
+  protected void runCleanup(TaskUmbilicalProtocol umbilical,
+                            TaskReporter reporter
+                            ) throws IOException, InterruptedException {
     // set phase for this task
     setPhase(TaskStatus.Phase.CLEANUP);
     getProgress().setStatus("cleanup");
     // do the cleanup
-    conf.getOutputCommitter().cleanupJob(jobContext);
-    done(umbilical);
+    committer.cleanupJob(jobContext);
+    done(umbilical, reporter);
   }
 
-  protected void runSetupJob(TaskUmbilicalProtocol umbilical) 
-  throws IOException {
+  protected void runSetupJob(TaskUmbilicalProtocol umbilical,
+                             TaskReporter reporter
+                             ) throws IOException, InterruptedException {
     // do the setup
     getProgress().setStatus("setup");
-    conf.getOutputCommitter().setupJob(jobContext);
-    done(umbilical);
+    committer.setupJob(jobContext);
+    done(umbilical, reporter);
   }
   
   public void setConf(Configuration conf) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java Mon Dec 15 14:21:32 2008
@@ -19,20 +19,23 @@
 
 import org.apache.hadoop.util.Progressable;
 
-public class TaskAttemptContext extends JobContext {
+/**
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.TaskAttemptContext}
+ *   instead.
+ */
+@Deprecated
+public class TaskAttemptContext 
+       extends org.apache.hadoop.mapreduce.TaskAttemptContext {
+  private Progressable progress;
 
-  private JobConf conf;
-  private TaskAttemptID taskid;
-  
   TaskAttemptContext(JobConf conf, TaskAttemptID taskid) {
     this(conf, taskid, Reporter.NULL);
   }
   
   TaskAttemptContext(JobConf conf, TaskAttemptID taskid,
                      Progressable progress) {
-    super(conf, progress);
-    this.conf = conf;
-    this.taskid = taskid;
+    super(conf, taskid);
+    this.progress = progress;
   }
   
   /**
@@ -41,16 +44,19 @@
    * @return TaskAttemptID
    */
   public TaskAttemptID getTaskAttemptID() {
-    return taskid;
+    return (TaskAttemptID) super.getTaskAttemptID();
+  }
+  
+  public Progressable getProgressible() {
+    return progress;
   }
   
-  /**
-   * Get the job Configuration.
-   * 
-   * @return JobConf
-   */
   public JobConf getJobConf() {
-    return conf;
+    return (JobConf) getConfiguration();
   }
 
+  @Override
+  public void progress() {
+    progress.progress();
+  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptID.java Mon Dec 15 14:21:32 2008
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapred;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 /**
@@ -42,9 +41,8 @@
  * @see JobID
  * @see TaskID
  */
-public class TaskAttemptID extends ID {
-  private static final String ATTEMPT = "attempt";
-  private final TaskID taskId;
+@Deprecated
+public class TaskAttemptID extends org.apache.hadoop.mapreduce.TaskAttemptID {
   
   /**
    * Constructs a TaskAttemptID object from given {@link TaskID}.  
@@ -52,11 +50,7 @@
    * @param id the task attempt number
    */
   public TaskAttemptID(TaskID taskId, int id) {
-    super(id);
-    if(taskId == null) {
-      throw new IllegalArgumentException("taskId cannot be null");
-    }
-    this.taskId = taskId;
+    super(taskId, id);
   }
   
   /**
@@ -73,77 +67,31 @@
   }
   
   public TaskAttemptID() { 
-    taskId = new TaskID();
-  }
-  
-  /** Returns the {@link JobID} object that this task attempt belongs to */
-  public JobID getJobID() {
-    return taskId.getJobID();
-  }
-  
-  /** Returns the {@link TaskID} object that this task attempt belongs to */
-  public TaskID getTaskID() {
-    return taskId;
-  }
-  
-  /**Returns whether this TaskAttemptID is a map ID */
-  public boolean isMap() {
-    return taskId.isMap();
-  }
-  
-  @Override
-  public boolean equals(Object o) {
-    if (!super.equals(o))
-      return false;
-    if(o.getClass().equals(TaskAttemptID.class)) {
-      TaskAttemptID that = (TaskAttemptID)o;
-      return this.id==that.id
-             && this.taskId.equals(that.taskId);
-    }
-    else return false;
-  }
-  
-  /**Compare TaskIds by first tipIds, then by task numbers. */
-  @Override
-  public int compareTo(ID o) {
-    TaskAttemptID that = (TaskAttemptID)o;
-    int tipComp = this.taskId.compareTo(that.taskId);
-    if(tipComp == 0) {
-      return this.id - that.id;
-    }
-    else return tipComp;
-  }
-  @Override
-  public String toString() { 
-    return appendTo(new StringBuilder(ATTEMPT)).toString();
+    super(new TaskID(), 0);
   }
 
   /**
-   * Add the unique string to the StringBuilder
-   * @param builder the builder to append ot
-   * @return the builder that was passed in.
+   * Downgrade a new TaskAttemptID to an old one
+   * @param old the new id
+   * @return either old or a new TaskAttemptID constructed to match old
    */
-  protected StringBuilder appendTo(StringBuilder builder) {
-    return taskId.appendTo(builder).append(SEPARATOR).append(id);
-  }
-  
-  @Override
-  public int hashCode() {
-    return taskId.hashCode() * 5 + id;
+  public static 
+  TaskAttemptID downgrade(org.apache.hadoop.mapreduce.TaskAttemptID old) {
+    if (old instanceof TaskAttemptID) {
+      return (TaskAttemptID) old;
+    } else {
+      return new TaskAttemptID(TaskID.downgrade(old.getTaskID()), old.getId());
+    }
   }
-  
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    taskId.readFields(in);
+
+  public TaskID getTaskID() {
+    return (TaskID) super.getTaskID();
   }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    taskId.write(out);
+  public JobID getJobID() {
+    return (JobID) super.getJobID();
   }
-  
+
   @Deprecated
   public static TaskAttemptID read(DataInput in) throws IOException {
     TaskAttemptID taskId = new TaskAttemptID();
@@ -157,25 +105,8 @@
    */
   public static TaskAttemptID forName(String str
                                       ) throws IllegalArgumentException {
-    if(str == null)
-      return null;
-    try {
-      String[] parts = str.split(Character.toString(SEPARATOR));
-      if(parts.length == 6) {
-        if(parts[0].equals(ATTEMPT)) {
-          boolean isMap = false;
-          if(parts[3].equals("m")) isMap = true;
-          else if(parts[3].equals("r")) isMap = false;
-          else throw new Exception();
-          return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
-              isMap, Integer.parseInt(parts[4]), Integer.parseInt(parts[5]));
-        }
-      }
-    } catch (Exception ex) {
-      //fall below
-    }
-    throw new IllegalArgumentException("TaskAttemptId string : " + str 
-        + " is not properly formed");
+    return (TaskAttemptID) 
+             org.apache.hadoop.mapreduce.TaskAttemptID.forName(str);
   }
   
   /** 
@@ -215,5 +146,4 @@
         .append(attemptId != null ? attemptId : "[0-9]*");
     return builder;
   }
-  
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskID.java Mon Dec 15 14:21:32 2008
@@ -19,9 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
-import java.text.NumberFormat;
 
 /**
  * TaskID represents the immutable and unique identifier for 
@@ -45,16 +43,8 @@
  * @see JobID
  * @see TaskAttemptID
  */
-public class TaskID extends ID {
-  private static final String TASK = "task";
-  private static final NumberFormat idFormat = NumberFormat.getInstance();
-  static {
-    idFormat.setGroupingUsed(false);
-    idFormat.setMinimumIntegerDigits(6);
-  }
-  
-  private JobID jobId;
-  private boolean isMap;
+@Deprecated
+public class TaskID extends org.apache.hadoop.mapreduce.TaskID {
 
   /**
    * Constructs a TaskID object from given {@link JobID}.  
@@ -62,13 +52,8 @@
    * @param isMap whether the tip is a map 
    * @param id the tip number
    */
-  public TaskID(JobID jobId, boolean isMap, int id) {
-    super(id);
-    if(jobId == null) {
-      throw new IllegalArgumentException("jobId cannot be null");
-    }
-    this.jobId = jobId;
-    this.isMap = isMap;
+  public TaskID(org.apache.hadoop.mapreduce.JobID jobId, boolean isMap,int id) {
+    super(jobId, isMap, id);
   }
   
   /**
@@ -82,81 +67,22 @@
     this(new JobID(jtIdentifier, jobId), isMap, id);
   }
   
-  public TaskID() { 
-    jobId = new JobID();
-  }
-  
-  /** Returns the {@link JobID} object that this tip belongs to */
-  public JobID getJobID() {
-    return jobId;
-  }
-  
-  /**Returns whether this TaskID is a map ID */
-  public boolean isMap() {
-    return isMap;
+  public TaskID() {
+    super(new JobID(), false, 0);
   }
   
-  @Override
-  public boolean equals(Object o) {
-    if (!super.equals(o))
-      return false;
-
-    TaskID that = (TaskID)o;
-    return this.isMap == that.isMap && this.jobId.equals(that.jobId);
-  }
-
-  /**Compare TaskInProgressIds by first jobIds, then by tip numbers. Reduces are 
-   * defined as greater then maps.*/
-  @Override
-  public int compareTo(ID o) {
-    TaskID that = (TaskID)o;
-    int jobComp = this.jobId.compareTo(that.jobId);
-    if (jobComp == 0) {
-      if (this.isMap == that.isMap) {
-        return this.id - that.id;
-      } else {
-        return this.isMap ? -1 : 1;
-      }
-    } else {
-      return jobComp;
-    }
-  }
-  
-  @Override
-  public String toString() { 
-    return appendTo(new StringBuilder(TASK)).toString();
-  }
-
   /**
-   * Add the unique string to the given builder.
-   * @param builder the builder to append to
-   * @return the builder that was passed in
+   * Downgrade a new TaskID to an old one
+   * @param old a new or old TaskID
+   * @return either old or a new TaskID build to match old
    */
-  protected StringBuilder appendTo(StringBuilder builder) {
-    return jobId.appendTo(builder).
-                 append(SEPARATOR).
-                 append(isMap ? 'm' : 'r').
-                 append(SEPARATOR).
-                 append(idFormat.format(id));
-  }
-  
-  @Override
-  public int hashCode() {
-    return jobId.hashCode() * 524287 + id;
-  }
-  
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    jobId.readFields(in);
-    isMap = in.readBoolean();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    jobId.write(out);
-    out.writeBoolean(isMap);
+  public static TaskID downgrade(org.apache.hadoop.mapreduce.TaskID old) {
+    if (old instanceof TaskID) {
+      return (TaskID) old;
+    } else {
+      return new TaskID(JobID.downgrade(old.getJobID()), old.isMap(), 
+                        old.getId());
+    }
   }
 
   @Deprecated
@@ -166,32 +92,10 @@
     return tipId;
   }
   
-  /** Construct a TaskID object from given string 
-   * @return constructed TaskID object or null if the given String is null
-   * @throws IllegalArgumentException if the given string is malformed
-   */
-  public static TaskID forName(String str) 
-    throws IllegalArgumentException {
-    if(str == null)
-      return null;
-    try {
-      String[] parts = str.split(Character.toString(SEPARATOR));
-      if(parts.length == 5) {
-        if(parts[0].equals(TASK)) {
-          boolean isMap = false;
-          if(parts[3].equals("m")) isMap = true;
-          else if(parts[3].equals("r")) isMap = false;
-          else throw new Exception();
-          return new TaskID(parts[1], Integer.parseInt(parts[2]),
-              isMap, Integer.parseInt(parts[4]));
-        }
-      }
-    }catch (Exception ex) {//fall below
-    }
-    throw new IllegalArgumentException("TaskId string : " + str 
-        + " is not properly formed");
+  public JobID getJobID() {
+    return (JobID) super.getJobID();
   }
-  
+
   /** 
    * Returns a regex pattern which matches task IDs. Arguments can 
    * be given null, in which case that part of the regex will be generic.  
@@ -226,5 +130,10 @@
       .append(taskId != null ? idFormat.format(taskId) : "[0-9]*");
     return builder;
   }
-  
+
+  public static TaskID forName(String str
+                               ) throws IllegalArgumentException {
+    return (TaskID) org.apache.hadoop.mapreduce.TaskID.forName(str);
+  }
+
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java Mon Dec 15 14:21:32 2008
@@ -30,7 +30,7 @@
 
 /** A report on the state of a task. */
 public class TaskReport implements Writable {
-  private final TaskID taskid;
+  private TaskID taskid;
   private float progress;
   private String state;
   private String[] diagnostics;
@@ -172,7 +172,7 @@
       return false;
     if(o.getClass().equals(TaskReport.class)) {
       TaskReport report = (TaskReport) o;
-      return counters.contentEquals(report.getCounters())
+      return counters.equals(report.getCounters())
              && Arrays.toString(this.diagnostics)
                       .equals(Arrays.toString(report.getDiagnostics()))
              && this.finishTime == report.getFinishTime()
@@ -215,11 +215,11 @@
   }
 
   public void readFields(DataInput in) throws IOException {
-    taskid.readFields(in);
-    progress = in.readFloat();
-    state = Text.readString(in);
-    startTime = in.readLong(); 
-    finishTime = in.readLong();
+    this.taskid.readFields(in);
+    this.progress = in.readFloat();
+    this.state = Text.readString(in);
+    this.startTime = in.readLong(); 
+    this.finishTime = in.readLong();
     
     diagnostics = WritableUtils.readStringArray(in);
     counters = new Counters();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextInputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextInputFormat.java Mon Dec 15 14:21:32 2008
@@ -27,7 +27,11 @@
 
 /** An {@link InputFormat} for plain text files.  Files are broken into lines.
  * Either linefeed or carriage-return are used to signal end of line.  Keys are
- * the position in the file, and values are the line of text.. */
+ * the position in the file, and values are the line of text.. 
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.input.TextInputFormat}
+ *  instead.
+ */
+@Deprecated
 public class TextInputFormat extends FileInputFormat<LongWritable, Text>
   implements JobConfigurable {
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java Mon Dec 15 14:21:32 2008
@@ -32,7 +32,11 @@
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.util.*;
 
-/** An {@link OutputFormat} that writes plain text files. */
+/** An {@link OutputFormat} that writes plain text files. 
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapreduce.lib.output.TextOutputFormat} instead.
+ */
+@Deprecated
 public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
 
   protected static class LineRecordWriter<K, V>

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/HashPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/HashPartitioner.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/HashPartitioner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/HashPartitioner.java Mon Dec 15 14:21:32 2008
@@ -21,7 +21,11 @@
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.JobConf;
 
-/** Partition keys by their {@link Object#hashCode()}. */
+/** Partition keys by their {@link Object#hashCode()}. 
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapreduce.lib.partition.HashPartitioner} instead.
+ */
+@Deprecated
 public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
 
   public void configure(JobConf job) {}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/IdentityMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/IdentityMapper.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/IdentityMapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/IdentityMapper.java Mon Dec 15 14:21:32 2008
@@ -25,7 +25,10 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.MapReduceBase;
 
-/** Implements the identity function, mapping inputs directly to outputs. */
+/** Implements the identity function, mapping inputs directly to outputs. 
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.Mapper} instead.
+ */
+@Deprecated
 public class IdentityMapper<K, V>
     extends MapReduceBase implements Mapper<K, V, K, V> {
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/IdentityReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/IdentityReducer.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/IdentityReducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/IdentityReducer.java Mon Dec 15 14:21:32 2008
@@ -27,7 +27,10 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.MapReduceBase;
 
-/** Performs no reduction, writing all input values directly to the output. */
+/** Performs no reduction, writing all input values directly to the output. 
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.Reducer} instead.
+ */
+@Deprecated
 public class IdentityReducer<K, V>
     extends MapReduceBase implements Reducer<K, V, K, V> {
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InverseMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InverseMapper.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InverseMapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InverseMapper.java Mon Dec 15 14:21:32 2008
@@ -25,7 +25,11 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
-/** A {@link Mapper} that swaps keys and values. */
+/** A {@link Mapper} that swaps keys and values. 
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.map.InverseMapper} 
+ *   instead.
+ */
+@Deprecated
 public class InverseMapper<K, V>
     extends MapReduceBase implements Mapper<K, V, V, K> {
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/LongSumReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/LongSumReducer.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/LongSumReducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/LongSumReducer.java Mon Dec 15 14:21:32 2008
@@ -28,7 +28,11 @@
 
 import org.apache.hadoop.io.LongWritable;
 
-/** A {@link Reducer} that sums long values. */
+/** A {@link Reducer} that sums long values. 
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer}
+ *    instead.
+ */
+@Deprecated
 public class LongSumReducer<K> extends MapReduceBase
     implements Reducer<K, LongWritable, K, LongWritable> {
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NullOutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NullOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NullOutputFormat.java Mon Dec 15 14:21:32 2008
@@ -27,7 +27,10 @@
 
 /**
  * Consume all outputs and put them in /dev/null. 
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapreduce.lib.output.NullOutputFormat} instead.
  */
+@Deprecated
 public class NullOutputFormat<K, V> implements OutputFormat<K, V> {
   
   public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TokenCountMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TokenCountMapper.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TokenCountMapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TokenCountMapper.java Mon Dec 15 14:21:32 2008
@@ -30,7 +30,11 @@
 
 
 /** A {@link Mapper} that maps text values into <token,freq> pairs.  Uses
- * {@link StringTokenizer} to break text into tokens. */
+ * {@link StringTokenizer} to break text into tokens. 
+ * @deprecated Use 
+ *    {@link org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper} instead.
+ */
+@Deprecated
 public class TokenCountMapper<K> extends MapReduceBase
     implements Mapper<K, Text, Text, LongWritable> {
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counter.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counter.java Mon Dec 15 14:21:32 2008
@@ -38,39 +38,60 @@
  */
 public class Counter implements Writable {
 
+  private String name;
   private String displayName;
-  private long value;
+  private long value = 0;
     
-  Counter() { 
-    value = 0L;
+  protected Counter() { 
   }
 
-  Counter(String displayName, long value) {
+  protected Counter(String name, String displayName) {
+    this.name = name;
+    this.displayName = displayName;
+  }
+  
+  @Deprecated
+  protected synchronized void setDisplayName(String displayName) {
     this.displayName = displayName;
-    this.value = value;
   }
     
   /**
    * Read the binary representation of the counter
    */
+  @Override
   public synchronized void readFields(DataInput in) throws IOException {
-    displayName = Text.readString(in);
+    name = Text.readString(in);
+    if (in.readBoolean()) {
+      displayName = Text.readString(in);
+    } else {
+      displayName = name;
+    }
     value = WritableUtils.readVLong(in);
   }
     
   /**
    * Write the binary representation of the counter
    */
+  @Override
   public synchronized void write(DataOutput out) throws IOException {
-    Text.writeString(out, displayName);
+    Text.writeString(out, name);
+    boolean distinctDisplayName = ! name.equals(displayName);
+    out.writeBoolean(distinctDisplayName);
+    if (distinctDisplayName) {
+      Text.writeString(out, displayName);
+    }
     WritableUtils.writeVLong(out, value);
   }
-    
+
+  public synchronized String getName() {
+    return name;
+  }
+
   /**
    * Get the name of the counter.
    * @return the user facing name of the counter
    */
-  public String getDisplayName() {
+  public synchronized String getDisplayName() {
     return displayName;
   }
     
@@ -89,4 +110,22 @@
   public synchronized void increment(long incr) {
     value += incr;
   }
+
+  @Override
+  public synchronized boolean equals(Object genericRight) {
+    if (genericRight instanceof Counter) {
+      synchronized (genericRight) {
+        Counter right = (Counter) genericRight;
+        return name.equals(right.name) && 
+               displayName.equals(right.displayName) &&
+               value == right.value;
+      }
+    }
+    return false;
+  }
+  
+  @Override
+  public synchronized int hashCode() {
+    return name.hashCode() + displayName.hashCode();
+  }
 }



Mime
View raw message