hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r902727 - in /hadoop/mapreduce/trunk: ./ src/examples/org/apache/hadoop/examples/terasort/ src/java/org/apache/hadoop/mapreduce/lib/input/
Date Mon, 25 Jan 2010 08:47:43 GMT
Author: cdouglas
Date: Mon Jan 25 08:47:42 2010
New Revision: 902727

URL: http://svn.apache.org/viewvc?rev=902727&view=rev
Log:
MAPREDUCE-361. Port terasort example to the new mapreduce API. Contributed by Amareshwari Sriramadasu

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=902727&r1=902726&r2=902727&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jan 25 08:47:42 2010
@@ -127,6 +127,9 @@
     MAPREDUCE-1337. Use generics in StreamJob to improve readability of that
     class. (Kay Kay via cdouglas)
 
+    MAPREDUCE-361. Port terasort example to the new mapreduce API. (Amareshwari
+    Sriramadasu via cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java?rev=902727&r1=902726&r2=902727&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java Mon Jan 25 08:47:42 2010
@@ -18,40 +18,31 @@
 package org.apache.hadoop.examples.terasort;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.zip.Checksum;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 public class TeraChecksum extends Configured implements Tool {
-  static class ChecksumMapper extends MapReduceBase 
-         implements Mapper<Text,Text,NullWritable,Unsigned16> {
-    private OutputCollector<NullWritable,Unsigned16> output;
+  static class ChecksumMapper 
+      extends Mapper<Text, Text, NullWritable, Unsigned16> {
     private Unsigned16 checksum = new Unsigned16();
     private Unsigned16 sum = new Unsigned16();
     private Checksum crc32 = new PureJavaCrc32();
 
     public void map(Text key, Text value, 
-                    OutputCollector<NullWritable,Unsigned16> output,
-                    Reporter reporter) throws IOException {
-      if (this.output == null) {
-        this.output = output;
-      }
+                    Context context) throws IOException {
       crc32.reset();
       crc32.update(key.getBytes(), 0, key.getLength());
       crc32.update(value.getBytes(), 0, value.getLength());
@@ -59,23 +50,22 @@
       sum.add(checksum);
     }
 
-    public void close() throws IOException {
-      if (output != null) {
-        output.collect(NullWritable.get(), sum);
-      }
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      context.write(NullWritable.get(), sum);
     }
   }
 
-  static class ChecksumReducer extends MapReduceBase 
-         implements Reducer<NullWritable,Unsigned16,NullWritable,Unsigned16> {
-    public void reduce(NullWritable key, Iterator<Unsigned16> values,
-                       OutputCollector<NullWritable, Unsigned16> output, 
-                       Reporter reporter) throws IOException {
+  static class ChecksumReducer 
+      extends Reducer<NullWritable, Unsigned16, NullWritable, Unsigned16> {
+
+    public void reduce(NullWritable key, Iterable<Unsigned16> values,
+        Context context) throws IOException, InterruptedException  {
       Unsigned16 sum = new Unsigned16();
-      while (values.hasNext()) {
-        sum.add(values.next());
+      for (Unsigned16 val : values) {
+        sum.add(val);
       }
-      output.collect(key, sum);
+      context.write(key, sum);
     }
   }
 
@@ -84,10 +74,10 @@
   }
 
   public int run(String[] args) throws Exception {
-    JobConf job = (JobConf) getConf();
+    Job job = Job.getInstance(new Cluster(getConf()), getConf());
     if (args.length != 2) {
       usage();
-      return 1;
+      return 2;
     }
     TeraInputFormat.setInputPaths(job, new Path(args[0]));
     FileOutputFormat.setOutputPath(job, new Path(args[1]));
@@ -99,16 +89,15 @@
     job.setOutputValueClass(Unsigned16.class);
     // force a single reducer
     job.setNumReduceTasks(1);
-    job.setInputFormat(TeraInputFormat.class);
-    JobClient.runJob(job);
-    return 0;
+    job.setInputFormatClass(TeraInputFormat.class);
+    return job.waitForCompletion(true) ? 0 : 1;
   }
 
   /**
    * @param args
    */
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new JobConf(), new TeraChecksum(), args);
+    int res = ToolRunner.run(new Configuration(), new TeraChecksum(), args);
     System.exit(res);
   }
 

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java?rev=902727&r1=902726&r2=902727&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java Mon Jan 25 08:47:42 2010
@@ -21,27 +21,30 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -71,12 +74,12 @@
    * An input format that assigns ranges of longs to each mapper.
    */
   static class RangeInputFormat 
-       implements InputFormat<LongWritable, NullWritable> {
+      extends InputFormat<LongWritable, NullWritable> {
     
     /**
      * An input split consisting of a range on numbers.
      */
-    static class RangeInputSplit implements InputSplit {
+    static class RangeInputSplit extends InputSplit implements Writable {
       long firstRow;
       long rowCount;
 
@@ -110,39 +113,42 @@
      * A record reader that will generate a range of numbers.
      */
     static class RangeRecordReader 
-          implements RecordReader<LongWritable, NullWritable> {
+        extends RecordReader<LongWritable, NullWritable> {
       long startRow;
       long finishedRows;
       long totalRows;
+      LongWritable key = null;
 
-      public RangeRecordReader(RangeInputSplit split) {
-        startRow = split.firstRow;
+      public RangeRecordReader() {
+      }
+      
+      public void initialize(InputSplit split, TaskAttemptContext context) 
+          throws IOException, InterruptedException {
+        startRow = ((RangeInputSplit)split).firstRow;
         finishedRows = 0;
-        totalRows = split.rowCount;
+        totalRows = ((RangeInputSplit)split).rowCount;
       }
 
       public void close() throws IOException {
         // NOTHING
       }
 
-      public LongWritable createKey() {
-        return new LongWritable();
+      public LongWritable getCurrentKey() {
+        return key;
       }
 
-      public NullWritable createValue() {
+      public NullWritable getCurrentValue() {
         return NullWritable.get();
       }
 
-      public long getPos() throws IOException {
-        return finishedRows;
-      }
-
       public float getProgress() throws IOException {
         return finishedRows / (float) totalRows;
       }
 
-      public boolean next(LongWritable key, 
-                          NullWritable value) {
+      public boolean nextKeyValue() {
+        if (key == null) {
+          key = new LongWritable();
+        }
         if (finishedRows < totalRows) {
           key.set(startRow + finishedRows);
           finishedRows += 1;
@@ -155,24 +161,25 @@
     }
 
     public RecordReader<LongWritable, NullWritable> 
-      getRecordReader(InputSplit split, JobConf job,
-                      Reporter reporter) throws IOException {
-      return new RangeRecordReader((RangeInputSplit) split);
+        createRecordReader(InputSplit split, TaskAttemptContext context) 
+        throws IOException {
+      return new RangeRecordReader();
     }
 
     /**
      * Create the desired number of splits, dividing the number of rows
      * between the mappers.
      */
-    public InputSplit[] getSplits(JobConf job, 
-                                  int numSplits) {
+    public List<InputSplit> getSplits(JobContext job) {
       long totalRows = getNumberOfRows(job);
+      int numSplits = job.getConfiguration().getInt(JobContext.NUM_MAPS, 1);
       LOG.info("Generating " + totalRows + " using " + numSplits);
-      InputSplit[] splits = new InputSplit[numSplits];
+      List<InputSplit> splits = new ArrayList<InputSplit>();
       long currentRow = 0;
-      for(int split=0; split < numSplits; ++split) {
-        long goal = (long) Math.ceil(totalRows * (double)(split+1) / numSplits);
-        splits[split] = new RangeInputSplit(currentRow, goal - currentRow);
+      for(int split = 0; split < numSplits; ++split) {
+        long goal = 
+          (long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
+        splits.add(new RangeInputSplit(currentRow, goal - currentRow));
         currentRow = goal;
       }
       return splits;
@@ -180,20 +187,20 @@
 
   }
   
-  static long getNumberOfRows(JobConf job) {
-    return job.getLong(NUM_ROWS, 0);
+  static long getNumberOfRows(JobContext job) {
+    return job.getConfiguration().getLong(NUM_ROWS, 0);
   }
   
-  static void setNumberOfRows(JobConf job, long numRows) {
-    job.setLong(NUM_ROWS, numRows);
+  static void setNumberOfRows(Job job, long numRows) {
+    job.getConfiguration().setLong(NUM_ROWS, numRows);
   }
 
   /**
    * The Mapper class that given a row number, will generate the appropriate 
    * output line.
    */
-  public static class SortGenMapper extends MapReduceBase 
-      implements Mapper<LongWritable, NullWritable, Text, Text> {
+  public static class SortGenMapper 
+      extends Mapper<LongWritable, NullWritable, Text, Text> {
 
     private Text key = new Text();
     private Text value = new Text();
@@ -208,19 +215,18 @@
     private Counter checksumCounter;
 
     public void map(LongWritable row, NullWritable ignored,
-                    OutputCollector<Text, Text> output,
-                    Reporter reporter) throws IOException {
+        Context context) throws IOException, InterruptedException {
       if (rand == null) {
         rowId = new Unsigned16(row.get());
         rand = Random16.skipAhead(rowId);
-        checksumCounter = reporter.getCounter(Counters.CHECKSUM);
+        checksumCounter = context.getCounter(Counters.CHECKSUM);
       }
       Random16.nextRand(rand);
       GenSort.generateRecord(buffer, rand, rowId);
       key.set(buffer, 0, TeraInputFormat.KEY_LENGTH);
       value.set(buffer, TeraInputFormat.KEY_LENGTH, 
                 TeraInputFormat.VALUE_LENGTH);
-      output.collect(key, value);
+      context.write(key, value);
       crc32.reset();
       crc32.update(buffer, 0, 
                    TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH);
@@ -230,7 +236,7 @@
     }
 
     @Override
-    public void close() {
+    public void cleanup(Context context) {
       checksumCounter.increment(total.getLow8());
     }
   }
@@ -271,15 +277,16 @@
   /**
    * @param args the cli arguments
    */
-  public int run(String[] args) throws IOException {
-    JobConf job = (JobConf) getConf();
+  public int run(String[] args) 
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = Job.getInstance(new Cluster(getConf()), getConf());
     if (args.length != 2) {
       usage();
-      return 1;
+      return 2;
     }
     setNumberOfRows(job, parseHumanLong(args[0]));
     Path outputDir = new Path(args[1]);
-    if (outputDir.getFileSystem(job).exists(outputDir)) {
+    if (outputDir.getFileSystem(getConf()).exists(outputDir)) {
       throw new IOException("Output directory " + outputDir + 
                             " already exists.");
     }
@@ -290,14 +297,13 @@
     job.setNumReduceTasks(0);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
-    job.setInputFormat(RangeInputFormat.class);
-    job.setOutputFormat(TeraOutputFormat.class);
-    JobClient.runJob(job);
-    return 0;
+    job.setInputFormatClass(RangeInputFormat.class);
+    job.setOutputFormatClass(TeraOutputFormat.class);
+    return job.waitForCompletion(true) ? 0 : 1;
   }
 
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new JobConf(), new TeraGen(), args);
+    int res = ToolRunner.run(new Configuration(), new TeraGen(), args);
     System.exit(res);
   }
 

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java?rev=902727&r1=902726&r2=902727&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java Mon Jan 25 08:47:42 2010
@@ -22,19 +22,21 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.LineRecordReader;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.QuickSort;
 import org.apache.hadoop.util.StringUtils;
@@ -47,13 +49,15 @@
 public class TeraInputFormat extends FileInputFormat<Text,Text> {
 
   static final String PARTITION_FILENAME = "_partition.lst";
-  private static final String NUM_PARTITIONS = "terasort.num.partitions";
-  private static final String SAMPLE_SIZE = "terasort.partitions.sample";
+  private static final String NUM_PARTITIONS = 
+    "mapreduce.terasort.num.partitions";
+  private static final String SAMPLE_SIZE = 
+    "mapreduce.terasort.partitions.sample";
   static final int KEY_LENGTH = 10;
   static final int VALUE_LENGTH = 90;
   static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
-  private static JobConf lastConf = null;
-  private static InputSplit[] lastResult = null;
+  private static JobContext lastContext = null;
+  private static List<InputSplit> lastResult = null;
 
   static class TeraFileSplit extends FileSplit {
     private String[] locations;
@@ -136,24 +140,25 @@
    * Use the input splits to take samples of the input and generate sample
    * keys. By default reads 100,000 keys from 10 locations in the input, sorts
    * them and picks N-1 keys to generate N equally sized partitions.
-   * @param conf the job to sample
+   * @param job the job to sample
    * @param partFile where to write the output file to
    * @throws IOException if something goes wrong
    */
-  public static void writePartitionFile(final JobConf conf, 
-                                        Path partFile) throws IOException {
+  public static void writePartitionFile(final JobContext job, 
+      Path partFile) throws IOException, InterruptedException  {
     long t1 = System.currentTimeMillis();
+    Configuration conf = job.getConfiguration();
     final TeraInputFormat inFormat = new TeraInputFormat();
     final TextSampler sampler = new TextSampler();
-    int partitions = conf.getNumReduceTasks();
+    int partitions = job.getNumReduceTasks();
     long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
-    final InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
+    final List<InputSplit> splits = inFormat.getSplits(job);
     long t2 = System.currentTimeMillis();
     System.out.println("Computing input splits took " + (t2 - t1) + "ms");
-    int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.length);
-    System.out.println("Sampling " + samples + " splits of " + splits.length);
+    int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size());
+    System.out.println("Sampling " + samples + " splits of " + splits.size());
     final long recordsPerSample = sampleSize / samples;
-    final int sampleStep = splits.length / samples;
+    final int sampleStep = splits.size() / samples;
     Thread[] samplerReader = new Thread[samples];
     // take N samples from different parts of the input
     for(int i=0; i < samples; ++i) {
@@ -164,14 +169,16 @@
           setDaemon(true);
         }
         public void run() {
-          Text key = new Text();
-          Text value = new Text();
           long records = 0;
           try {
-            RecordReader<Text,Text> reader = 
-              inFormat.getRecordReader(splits[sampleStep * idx], conf, null);
-            while (reader.next(key, value)) {
-              sampler.addKey(key);
+            TaskAttemptContext context = new TaskAttemptContextImpl(
+              job.getConfiguration(), new TaskAttemptID());
+            RecordReader<Text, Text> reader = 
+              inFormat.createRecordReader(splits.get(sampleStep * idx),
+              context);
+            reader.initialize(splits.get(sampleStep * idx), context);
+            while (reader.nextKeyValue()) {
+              sampler.addKey(new Text(reader.getCurrentKey()));
               records += 1;
               if (recordsPerSample <= records) {
                 break;
@@ -181,6 +188,8 @@
             System.err.println("Got an exception while reading splits " +
                 StringUtils.stringifyException(ie));
             System.exit(-1);
+          } catch (InterruptedException e) {
+        	  
           }
         }
       };
@@ -203,46 +212,47 @@
     System.out.println("Computing parititions took " + (t3 - t2) + "ms");
   }
 
-  static class TeraRecordReader implements RecordReader<Text,Text> {
+  static class TeraRecordReader extends RecordReader<Text,Text> {
     private FSDataInputStream in;
     private long offset;
     private long length;
     private static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
     private byte[] buffer = new byte[RECORD_LENGTH];
+    private Text key;
+    private Text value;
 
-    public TeraRecordReader(Configuration job, 
-                            FileSplit split) throws IOException {
-      Path p = split.getPath();
-      FileSystem fs = p.getFileSystem(job);
+    public TeraRecordReader() throws IOException {
+    }
+
+    public void initialize(InputSplit split, TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      Path p = ((FileSplit)split).getPath();
+      FileSystem fs = p.getFileSystem(context.getConfiguration());
       in = fs.open(p);
-      long start = split.getStart();
+      long start = ((FileSplit)split).getStart();
       // find the offset to start at a record boundary
       offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
       in.seek(start + offset);
-      length = split.getLength();
+      length = ((FileSplit)split).getLength();
     }
 
     public void close() throws IOException {
       in.close();
     }
 
-    public Text createKey() {
-      return new Text();
-    }
-
-    public Text createValue() {
-      return new Text();
+    public Text getCurrentKey() {
+      return key;
     }
 
-    public long getPos() throws IOException {
-      return in.getPos();
+    public Text getCurrentValue() {
+      return value;
     }
 
     public float getProgress() throws IOException {
       return (float) offset / length;
     }
 
-    public boolean next(Text key, Text value) throws IOException {
+    public boolean nextKeyValue() throws IOException {
       if (offset >= length) {
         return false;
       }
@@ -258,6 +268,12 @@
         }
         read += newRead;
       }
+      if (key == null) {
+        key = new Text();
+      }
+      if (value == null) {
+        value = new Text();
+      }
       key.set(buffer, 0, KEY_LENGTH);
       value.set(buffer, KEY_LENGTH, VALUE_LENGTH);
       offset += RECORD_LENGTH;
@@ -267,31 +283,30 @@
 
   @Override
   public RecordReader<Text, Text> 
-      getRecordReader(InputSplit split,
-                      JobConf job, 
-                      Reporter reporter) throws IOException {
-    return new TeraRecordReader(job, (FileSplit) split);
+      createRecordReader(InputSplit split, TaskAttemptContext context) 
+      throws IOException {
+    return new TeraRecordReader();
   }
 
-  @Override
   protected FileSplit makeSplit(Path file, long start, long length, 
                                 String[] hosts) {
     return new TeraFileSplit(file, start, length, hosts);
   }
 
   @Override
-  public InputSplit[] getSplits(JobConf conf, int splits) throws IOException {
-    if (conf == lastConf) {
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    if (job == lastContext) {
       return lastResult;
     }
     long t1, t2, t3;
     t1 = System.currentTimeMillis();
-    lastConf = conf;
-    lastResult = super.getSplits(conf, splits);
+    lastContext = job;
+    lastResult = super.getSplits(job);
     t2 = System.currentTimeMillis();
     System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
-    if (conf.getBoolean("terasort.use.terascheduler", true)) {
-      TeraScheduler scheduler = new TeraScheduler((FileSplit[]) lastResult, conf);
+    if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
+      TeraScheduler scheduler = new TeraScheduler(
+        lastResult.toArray(new TeraFileSplit[0]), job.getConfiguration());
       lastResult = scheduler.getNewFileSplits();
       t3 = System.currentTimeMillis(); 
       System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java?rev=902727&r1=902726&r2=902727&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java Mon Jan 25 08:47:42 2010
@@ -24,46 +24,43 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InvalidJobConfException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 /**
  * An output format that writes the key and value appended together.
  */
 public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
-  static final String FINAL_SYNC_ATTRIBUTE = "terasort.final.sync";
+  static final String FINAL_SYNC_ATTRIBUTE = "mapreduce.terasort.final.sync";
+  private OutputCommitter committer = null;
 
   /**
    * Set the requirement for a final sync before the stream is closed.
    */
-  public static void setFinalSync(JobConf conf, boolean newValue) {
-    conf.setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
+  static void setFinalSync(JobContext job, boolean newValue) {
+    job.getConfiguration().setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
   }
 
   /**
    * Does the user want a final sync at close?
    */
-  public static boolean getFinalSync(JobConf conf) {
-    return conf.getBoolean(FINAL_SYNC_ATTRIBUTE, false);
+  public static boolean getFinalSync(JobContext job) {
+    return job.getConfiguration().getBoolean(FINAL_SYNC_ATTRIBUTE, false);
   }
 
-  static class TeraRecordWriter implements RecordWriter<Text,Text> {
+  static class TeraRecordWriter extends RecordWriter<Text,Text> {
     private boolean finalSync = false;
     private FSDataOutputStream out;
 
     public TeraRecordWriter(FSDataOutputStream out,
-                            JobConf conf) {
-      finalSync = getFinalSync(conf);
+                            JobContext job) {
+      finalSync = getFinalSync(job);
       this.out = out;
     }
 
@@ -73,7 +70,7 @@
       out.write(value.getBytes(), 0, value.getLength());
     }
     
-    public void close(Reporter reporter) throws IOException {
+    public void close(TaskAttemptContext context) throws IOException {
       if (finalSync) {
         out.sync();
       }
@@ -82,37 +79,41 @@
   }
 
   @Override
-  public void checkOutputSpecs(FileSystem ignored, 
-                               JobConf job
+  public void checkOutputSpecs(JobContext job
                               ) throws InvalidJobConfException, IOException {
-    // Ensure that the output directory is set and not already there
+    // Ensure that the output directory is set
     Path outDir = getOutputPath(job);
     if (outDir == null) {
       throw new InvalidJobConfException("Output directory not set in JobConf.");
     }
   }
 
-  public RecordWriter<Text,Text> getRecordWriter(FileSystem ignored,
-                                                 JobConf job,
-                                                 String name,
-                                                 Progressable progress
+  public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job
                                                  ) throws IOException {
-    Path dir = getWorkOutputPath(job);
-    FileSystem fs = dir.getFileSystem(job);
-    FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+    Path file = getDefaultWorkFile(job, "");
+    FileSystem fs = file.getFileSystem(job.getConfiguration());
+     FSDataOutputStream fileOut = fs.create(file);
     return new TeraRecordWriter(fileOut, job);
   }
   
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
+      throws IOException {
+    if (committer == null) {
+      Path output = getOutputPath(context);
+      committer = new TeraOutputCommitter(output, context);
+    }
+    return committer;
+  }
+
   public static class TeraOutputCommitter extends FileOutputCommitter {
 
-    @Override
-    public void commitJob(JobContext jobContext) {
+    public TeraOutputCommitter(Path outputPath, TaskAttemptContext context)
+        throws IOException {
+      super(outputPath, context);
     }
 
     @Override
-    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
-      return taskContext.getTaskAttemptID().getTaskID().getTaskType() ==
-               TaskType.REDUCE;
+    public void commitJob(JobContext jobContext) {
     }
 
     @Override

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java?rev=902727&r1=902726&r2=902727&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java Mon Jan 25 08:47:42 2010
@@ -25,21 +25,24 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.terasort.TeraInputFormat.TeraFileSplit;
-import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 
 class TeraScheduler {
+  static String USE = "mapreduce.terasort.use.terascheduler";
   private static final Log LOG = LogFactory.getLog(TeraScheduler.class);
-  private InputSplit[] splits;
+  private Split[] splits;
   private List<Host> hosts = new ArrayList<Host>();
   private int slotsPerHost;
   private int remainingSplits = 0;
   private FileSplit[] realSplits = null;
 
-  static class InputSplit {
+  static class Split {
     String filename;
     boolean isAssigned = false;
     List<Host> locations = new ArrayList<Host>();
-    InputSplit(String filename) {
+    Split(String filename) {
       this.filename = filename;
     }
     public String toString() {
@@ -55,7 +58,7 @@
   }
   static class Host {
     String hostname;
-    List<InputSplit> splits = new ArrayList<InputSplit>();
+    List<Split> splits = new ArrayList<Split>();
     Host(String hostname) {
       this.hostname = hostname;
     }
@@ -92,11 +95,11 @@
     }
     // read the blocks
     List<String> splitLines = readFile(splitFilename);
-    splits = new InputSplit[splitLines.size()];
+    splits = new Split[splitLines.size()];
     remainingSplits = 0;
     for(String line: splitLines) {
       StringTokenizer itr = new StringTokenizer(line);
-      InputSplit newSplit = new InputSplit(itr.nextToken());
+      Split newSplit = new Split(itr.nextToken());
       splits[remainingSplits++] = newSplit;
       while (itr.hasMoreTokens()) {
         Host host = hostIds.get(itr.nextToken());
@@ -109,11 +112,11 @@
   public TeraScheduler(FileSplit[] realSplits,
                        Configuration conf) throws IOException {
     this.realSplits = realSplits;
-    this.slotsPerHost = conf.getInt("mapred.tasktracker.map.tasks.maximum", 4);
+    this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
     Map<String, Host> hostTable = new HashMap<String, Host>();
-    splits = new InputSplit[realSplits.length];
+    splits = new Split[realSplits.length];
     for(FileSplit realSplit: realSplits) {
-      InputSplit split = new InputSplit(realSplit.getPath().toString());
+      Split split = new Split(realSplit.getPath().toString());
       splits[remainingSplits++] = split;
       for(String hostname: realSplit.getLocations()) {
         Host host = hostTable.get(hostname);
@@ -148,8 +151,8 @@
     int tasksToPick = Math.min(slotsPerHost, 
                                (int) Math.ceil((double) remainingSplits / 
                                                hosts.size()));
-    InputSplit[] best = new InputSplit[tasksToPick];
-    for(InputSplit cur: host.splits) {
+    Split[] best = new Split[tasksToPick];
+    for(Split cur: host.splits) {
       LOG.debug("  examine: " + cur.filename + " " + cur.locations.size());
       int i = 0;
       while (i < tasksToPick && best[i] != null && 
@@ -177,7 +180,7 @@
       }
     }
     // for the non-chosen blocks, remove this host
-    for(InputSplit cur: host.splits) {
+    for(Split cur: host.splits) {
       if (!cur.isAssigned) {
         cur.locations.remove(host);
       }
@@ -200,7 +203,7 @@
    *    best host as the only host.
    * @throws IOException
    */
-  public FileSplit[] getNewFileSplits() throws IOException {
+  public List<InputSplit> getNewFileSplits() throws IOException {
     solve();
     FileSplit[] result = new FileSplit[realSplits.length];
     int left = 0;
@@ -215,7 +218,11 @@
         result[right--] = realSplits[i];
       }
     }
-    return result;
+    List<InputSplit> ret = new ArrayList<InputSplit>();
+    for (FileSplit fs : result) {
+      ret.add(fs);
+    }
+    return ret;
   }
 
   public static void main(String[] args) throws IOException {
@@ -225,7 +232,7 @@
     }
     LOG.info("starting solve");
     problem.solve();
-    List<InputSplit> leftOvers = new ArrayList<InputSplit>();
+    List<Split> leftOvers = new ArrayList<Split>();
     for(int i=0; i < problem.splits.length; ++i) {
       if (problem.splits[i].isAssigned) {
         System.out.println("sched: " + problem.splits[i]);        
@@ -233,7 +240,7 @@
         leftOvers.add(problem.splits[i]);
       }
     }
-    for(InputSplit cur: leftOvers) {
+    for(Split cur: leftOvers) {
       System.out.println("left: " + cur);
     }
     System.out.println("left over: " + leftOvers.size());

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java?rev=902727&r1=902726&r2=902727&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java Mon Jan 25 08:47:42 2010
@@ -25,17 +25,17 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.examples.terasort.TeraOutputFormat.TeraOutputCommitter;
-import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -48,14 +48,18 @@
  */
 public class TeraSort extends Configured implements Tool {
   private static final Log LOG = LogFactory.getLog(TeraSort.class);
+  static String SIMPLE_PARTITIONER = "mapreduce.terasort.simplepartitioner";
+  static String OUTPUT_REPLICATION = "mapreduce.terasort.output.replication";
 
   /**
    * A partitioner that splits text keys into roughly equal partitions
    * in a global sorted order.
    */
-  static class TotalOrderPartitioner implements Partitioner<Text,Text>{
+  static class TotalOrderPartitioner extends Partitioner<Text,Text>
+      implements Configurable {
     private TrieNode trie;
     private Text[] splitPoints;
+    private Configuration conf;
 
     /**
      * A generic trie node
@@ -147,9 +151,9 @@
      * @return the strings to split the partitions on
      * @throws IOException
      */
-    private static Text[] readPartitions(FileSystem fs, Path p, 
-                                         JobConf job) throws IOException {
-      int reduces = job.getNumReduceTasks();
+    private static Text[] readPartitions(FileSystem fs, Path p,
+        Configuration conf) throws IOException {
+      int reduces = conf.getInt(JobContext.NUM_REDUCES, 1);
       Text[] result = new Text[reduces - 1];
       DataInputStream reader = fs.open(p);
       for(int i=0; i < reduces - 1; ++i) {
@@ -201,17 +205,22 @@
       return result;
     }
 
-    public void configure(JobConf job) {
+    public void setConf(Configuration conf) {
       try {
-        FileSystem fs = FileSystem.getLocal(job);
+        FileSystem fs = FileSystem.getLocal(conf);
+        this.conf = conf;
         Path partFile = new Path(TeraInputFormat.PARTITION_FILENAME);
-        splitPoints = readPartitions(fs, partFile, job);
+        splitPoints = readPartitions(fs, partFile, conf);
         trie = buildTrie(splitPoints, 0, splitPoints.length, new Text(), 2);
       } catch (IOException ie) {
         throw new IllegalArgumentException("can't read paritions file", ie);
       }
     }
 
+    public Configuration getConf() {
+      return conf;
+    }
+    
     public TotalOrderPartitioner() {
     }
 
@@ -225,13 +234,21 @@
    * A total order partitioner that assigns keys based on their first 
    * PREFIX_LENGTH bytes, assuming a flat distribution.
    */
-  public static class SimplePartitioner implements Partitioner<Text, Text>{
+  public static class SimplePartitioner extends Partitioner<Text, Text>
+      implements Configurable {
     int prefixesPerReduce;
     private static final int PREFIX_LENGTH = 3;
-    public void configure(JobConf job) {
+    private Configuration conf = null;
+    public void setConf(Configuration conf) {
+      this.conf = conf;
       prefixesPerReduce = (int) Math.ceil((1 << (8 * PREFIX_LENGTH)) / 
-                                          (float) job.getNumReduceTasks());
+        (float) conf.getInt(JobContext.NUM_REDUCES, 1));
     }
+    
+    public Configuration getConf() {
+      return conf;
+    }
+    
     @Override
     public int getPartition(Text key, Text value, int numPartitions) {
       byte[] bytes = key.getBytes();
@@ -244,35 +261,36 @@
     }
   }
 
-  public static boolean getUseSimplePartitioner(Configuration conf) {
-    return conf.getBoolean("terasort.partitioner.simple", false);
+  public static boolean getUseSimplePartitioner(JobContext job) {
+    return job.getConfiguration().getBoolean(SIMPLE_PARTITIONER, false);
   }
 
-  public static void setUseSimplePartitioner(Configuration conf,
-                                             boolean value) {
-    conf.setBoolean("terasort.partitioner.simple", value);
+  public static void setUseSimplePartitioner(Job job, boolean value) {
+    job.getConfiguration().setBoolean(SIMPLE_PARTITIONER, value);
+  }
+
+  public static int getOutputReplication(JobContext job) {
+    return job.getConfiguration().getInt(OUTPUT_REPLICATION, 1);
+  }
+
+  public static void setOutputReplication(Job job, int value) {
+    job.getConfiguration().setInt(OUTPUT_REPLICATION, value);
   }
 
   public int run(String[] args) throws Exception {
     LOG.info("starting");
-    JobConf job = (JobConf) getConf();
+    Job job = Job.getInstance(new Cluster(getConf()), getConf());
     Path inputDir = new Path(args[0]);
     Path outputDir = new Path(args[1]);
     boolean useSimplePartitioner = getUseSimplePartitioner(job);
-    FileSystem outputFileSystem = outputDir.getFileSystem(job);
-    outputDir = outputDir.makeQualified(outputFileSystem);
-    if (outputFileSystem.exists(outputDir)) {
-      throw new IOException("Output directory " + outputDir + 
-                            " already exists.");
-    }
     TeraInputFormat.setInputPaths(job, inputDir);
     FileOutputFormat.setOutputPath(job, outputDir);
     job.setJobName("TeraSort");
     job.setJarByClass(TeraSort.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
-    job.setInputFormat(TeraInputFormat.class);
-    job.setOutputFormat(TeraOutputFormat.class);
+    job.setInputFormatClass(TeraInputFormat.class);
+    job.setOutputFormatClass(TeraOutputFormat.class);
     if (useSimplePartitioner) {
       job.setPartitionerClass(SimplePartitioner.class);
     } else {
@@ -282,27 +300,25 @@
       URI partitionUri = new URI(partitionFile.toString() +
                                  "#" + TeraInputFormat.PARTITION_FILENAME);
       TeraInputFormat.writePartitionFile(job, partitionFile);
-      DistributedCache.addCacheFile(partitionUri, job);
-      DistributedCache.createSymlink(job);    
+      job.addCacheFile(partitionUri);
+      job.createSymlink();    
       long end = System.currentTimeMillis();
       System.out.println("Spent " + (end - start) + "ms computing partitions.");
       job.setPartitionerClass(TotalOrderPartitioner.class);
     }
-    job.setOutputCommitter(TeraOutputCommitter.class);
     
-    job.setInt("dfs.replication", 
-               job.getInt("terasort.output.replication", 1));
+    job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
     TeraOutputFormat.setFinalSync(job, true);
-    JobClient.runJob(job);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
     LOG.info("done");
-    return 0;
+    return ret;
   }
 
   /**
    * @param args
    */
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new JobConf(), new TeraSort(), args);
+    int res = ToolRunner.run(new Configuration(), new TeraSort(), args);
     System.exit(res);
   }
 

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java?rev=902727&r1=902726&r2=902727&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java Mon Jan 25 08:47:42 2010
@@ -19,23 +19,20 @@
 package org.apache.hadoop.examples.terasort;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.zip.Checksum;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -63,10 +60,8 @@
     return b.toString();
   }
 
-  static class ValidateMapper extends MapReduceBase 
-      implements Mapper<Text,Text,Text,Text> {
+  static class ValidateMapper extends Mapper<Text,Text,Text,Text> {
     private Text lastKey;
-    private OutputCollector<Text,Text> output;
     private String filename;
     private Unsigned16 checksum = new Unsigned16();
     private Unsigned16 tmp = new Unsigned16();
@@ -75,27 +70,22 @@
     /**
      * Get the final part of the input name
      * @param split the input split
-     * @return the "part-00000" for the input
+     * @return the "part-r-00000" for the input
      */
     private String getFilename(FileSplit split) {
       return split.getPath().getName();
     }
 
-    private int getPartition(FileSplit split) {
-      return Integer.parseInt(split.getPath().getName().substring(5));
-    }
-
-    public void map(Text key, Text value, OutputCollector<Text,Text> output,
-                    Reporter reporter) throws IOException {
+    public void map(Text key, Text value, Context context) 
+        throws IOException, InterruptedException {
       if (lastKey == null) {
-        FileSplit fs = (FileSplit) reporter.getInputSplit();
+        FileSplit fs = (FileSplit) context.getInputSplit();
         filename = getFilename(fs);
-        output.collect(new Text(filename + ":begin"), key);
+        context.write(new Text(filename + ":begin"), key);
         lastKey = new Text();
-        this.output = output;
       } else {
         if (key.compareTo(lastKey) < 0) {
-          output.collect(ERROR, new Text("misorder in " + filename + 
+          context.write(ERROR, new Text("misorder in " + filename + 
                                          " between " + textifyBytes(lastKey) + 
                                          " and " + textifyBytes(key)));
         }
@@ -109,10 +99,11 @@
       lastKey.set(key);
     }
     
-    public void close() throws IOException {
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException  {
       if (lastKey != null) {
-        output.collect(new Text(filename + ":end"), lastKey);
-        output.collect(CHECKSUM, new Text(checksum.toString()));
+        context.write(new Text(filename + ":end"), lastKey);
+        context.write(CHECKSUM, new Text(checksum.toString()));
       }
     }
   }
@@ -122,34 +113,31 @@
    * boundary keys are always increasing.
    * Also passes any error reports along intact.
    */
-  static class ValidateReducer extends MapReduceBase 
-      implements Reducer<Text,Text,Text,Text> {
+  static class ValidateReducer extends Reducer<Text,Text,Text,Text> {
     private boolean firstKey = true;
     private Text lastKey = new Text();
     private Text lastValue = new Text();
-    public void reduce(Text key, Iterator<Text> values,
-                       OutputCollector<Text, Text> output, 
-                       Reporter reporter) throws IOException {
+    public void reduce(Text key, Iterable<Text> values,
+        Context context) throws IOException, InterruptedException  {
       if (ERROR.equals(key)) {
-        while(values.hasNext()) {
-          output.collect(key, values.next());
+        for (Text val : values) {
+          context.write(key, val);
         }
       } else if (CHECKSUM.equals(key)) {
         Unsigned16 tmp = new Unsigned16();
         Unsigned16 sum = new Unsigned16();
-        while (values.hasNext()) {
-          String val = values.next().toString();
-          tmp.set(val);
+        for (Text val : values) {
+          tmp.set(val.toString());
           sum.add(tmp);
         }
-        output.collect(CHECKSUM, new Text(sum.toString()));
+        context.write(CHECKSUM, new Text(sum.toString()));
       } else {
-        Text value = values.next();
+        Text value = values.iterator().next();
         if (firstKey) {
           firstKey = false;
         } else {
           if (value.compareTo(lastValue) < 0) {
-            output.collect(ERROR, 
+            context.write(ERROR, 
                            new Text("bad key partitioning:\n  file " + 
                                     lastKey + " key " + 
                                     textifyBytes(lastValue) +
@@ -169,7 +157,7 @@
   }
 
   public int run(String[] args) throws Exception {
-    JobConf job = (JobConf) getConf();
+    Job job = Job.getInstance(new Cluster(getConf()), getConf());
     if (args.length != 2) {
       usage();
       return 1;
@@ -185,18 +173,16 @@
     // force a single reducer
     job.setNumReduceTasks(1);
     // force a single split 
-    job.setLong(org.apache.hadoop.mapreduce.lib.input.
-                FileInputFormat.SPLIT_MINSIZE, Long.MAX_VALUE);
-    job.setInputFormat(TeraInputFormat.class);
-    JobClient.runJob(job);
-    return 0;
+    FileInputFormat.setMinInputSplitSize(job, Long.MAX_VALUE);
+    job.setInputFormatClass(TeraInputFormat.class);
+    return job.waitForCompletion(true) ? 0 : 1;
   }
 
   /**
    * @param args
    */
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new JobConf(), new TeraValidate(), args);
+    int res = ToolRunner.run(new Configuration(), new TeraValidate(), args);
     System.exit(res);
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=902727&r1=902726&r2=902727&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Mon Jan 25 08:47:42 2010
@@ -238,6 +238,14 @@
     return result;
   }
   
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileSplit makeSplit(Path file, long start, long length, 
+                                String[] hosts) {
+    return new FileSplit(file, start, length, hosts);
+  }
 
   /** 
    * Generate the list of files and make them into FileSplits.
@@ -261,20 +269,20 @@
         long bytesRemaining = length;
         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
-          splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
+          splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
                                    blkLocations[blkIndex].getHosts()));
           bytesRemaining -= splitSize;
         }
         
         if (bytesRemaining != 0) {
-          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
+          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 
                      blkLocations[blkLocations.length-1].getHosts()));
         }
       } else if (length != 0) {
-        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
       } else { 
         //Create empty hosts array for zero length files
-        splits.add(new FileSplit(path, 0, length, new String[0]));
+        splits.add(makeSplit(path, 0, length, new String[0]));
       }
     }
     LOG.debug("Total # of splits: " + splits.size());



Mime
View raw message