hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r903227 [9/16] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ src/benchmarks/gridmix/ src/benchmarks/gridmix/javasort/ src/benchmarks/gridmix/maxent/ src/benchmarks/gridmix/monsterQuery/ src/benchmarks/grid...
Date Tue, 26 Jan 2010 14:03:09 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java Tue Jan 26 14:02:53 2010
@@ -21,27 +21,30 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -71,12 +74,12 @@
    * An input format that assigns ranges of longs to each mapper.
    */
   static class RangeInputFormat 
-       implements InputFormat<LongWritable, NullWritable> {
+      extends InputFormat<LongWritable, NullWritable> {
     
     /**
      * An input split consisting of a range on numbers.
      */
-    static class RangeInputSplit implements InputSplit {
+    static class RangeInputSplit extends InputSplit implements Writable {
       long firstRow;
       long rowCount;
 
@@ -110,39 +113,42 @@
      * A record reader that will generate a range of numbers.
      */
     static class RangeRecordReader 
-          implements RecordReader<LongWritable, NullWritable> {
+        extends RecordReader<LongWritable, NullWritable> {
       long startRow;
       long finishedRows;
       long totalRows;
+      LongWritable key = null;
 
-      public RangeRecordReader(RangeInputSplit split) {
-        startRow = split.firstRow;
+      public RangeRecordReader() {
+      }
+      
+      public void initialize(InputSplit split, TaskAttemptContext context) 
+          throws IOException, InterruptedException {
+        startRow = ((RangeInputSplit)split).firstRow;
         finishedRows = 0;
-        totalRows = split.rowCount;
+        totalRows = ((RangeInputSplit)split).rowCount;
       }
 
       public void close() throws IOException {
         // NOTHING
       }
 
-      public LongWritable createKey() {
-        return new LongWritable();
+      public LongWritable getCurrentKey() {
+        return key;
       }
 
-      public NullWritable createValue() {
+      public NullWritable getCurrentValue() {
         return NullWritable.get();
       }
 
-      public long getPos() throws IOException {
-        return finishedRows;
-      }
-
       public float getProgress() throws IOException {
         return finishedRows / (float) totalRows;
       }
 
-      public boolean next(LongWritable key, 
-                          NullWritable value) {
+      public boolean nextKeyValue() {
+        if (key == null) {
+          key = new LongWritable();
+        }
         if (finishedRows < totalRows) {
           key.set(startRow + finishedRows);
           finishedRows += 1;
@@ -155,24 +161,25 @@
     }
 
     public RecordReader<LongWritable, NullWritable> 
-      getRecordReader(InputSplit split, JobConf job,
-                      Reporter reporter) throws IOException {
-      return new RangeRecordReader((RangeInputSplit) split);
+        createRecordReader(InputSplit split, TaskAttemptContext context) 
+        throws IOException {
+      return new RangeRecordReader();
     }
 
     /**
      * Create the desired number of splits, dividing the number of rows
      * between the mappers.
      */
-    public InputSplit[] getSplits(JobConf job, 
-                                  int numSplits) {
+    public List<InputSplit> getSplits(JobContext job) {
       long totalRows = getNumberOfRows(job);
+      int numSplits = job.getConfiguration().getInt(JobContext.NUM_MAPS, 1);
       LOG.info("Generating " + totalRows + " using " + numSplits);
-      InputSplit[] splits = new InputSplit[numSplits];
+      List<InputSplit> splits = new ArrayList<InputSplit>();
       long currentRow = 0;
-      for(int split=0; split < numSplits; ++split) {
-        long goal = (long) Math.ceil(totalRows * (double)(split+1) / numSplits);
-        splits[split] = new RangeInputSplit(currentRow, goal - currentRow);
+      for(int split = 0; split < numSplits; ++split) {
+        long goal = 
+          (long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
+        splits.add(new RangeInputSplit(currentRow, goal - currentRow));
         currentRow = goal;
       }
       return splits;
@@ -180,20 +187,20 @@
 
   }
   
-  static long getNumberOfRows(JobConf job) {
-    return job.getLong(NUM_ROWS, 0);
+  static long getNumberOfRows(JobContext job) {
+    return job.getConfiguration().getLong(NUM_ROWS, 0);
   }
   
-  static void setNumberOfRows(JobConf job, long numRows) {
-    job.setLong(NUM_ROWS, numRows);
+  static void setNumberOfRows(Job job, long numRows) {
+    job.getConfiguration().setLong(NUM_ROWS, numRows);
   }
 
   /**
    * The Mapper class that given a row number, will generate the appropriate 
    * output line.
    */
-  public static class SortGenMapper extends MapReduceBase 
-      implements Mapper<LongWritable, NullWritable, Text, Text> {
+  public static class SortGenMapper 
+      extends Mapper<LongWritable, NullWritable, Text, Text> {
 
     private Text key = new Text();
     private Text value = new Text();
@@ -208,19 +215,18 @@
     private Counter checksumCounter;
 
     public void map(LongWritable row, NullWritable ignored,
-                    OutputCollector<Text, Text> output,
-                    Reporter reporter) throws IOException {
+        Context context) throws IOException, InterruptedException {
       if (rand == null) {
         rowId = new Unsigned16(row.get());
         rand = Random16.skipAhead(rowId);
-        checksumCounter = reporter.getCounter(Counters.CHECKSUM);
+        checksumCounter = context.getCounter(Counters.CHECKSUM);
       }
       Random16.nextRand(rand);
       GenSort.generateRecord(buffer, rand, rowId);
       key.set(buffer, 0, TeraInputFormat.KEY_LENGTH);
       value.set(buffer, TeraInputFormat.KEY_LENGTH, 
                 TeraInputFormat.VALUE_LENGTH);
-      output.collect(key, value);
+      context.write(key, value);
       crc32.reset();
       crc32.update(buffer, 0, 
                    TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH);
@@ -230,7 +236,7 @@
     }
 
     @Override
-    public void close() {
+    public void cleanup(Context context) {
       checksumCounter.increment(total.getLow8());
     }
   }
@@ -271,15 +277,16 @@
   /**
    * @param args the cli arguments
    */
-  public int run(String[] args) throws IOException {
-    JobConf job = (JobConf) getConf();
+  public int run(String[] args) 
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = Job.getInstance(new Cluster(getConf()), getConf());
     if (args.length != 2) {
       usage();
-      return 1;
+      return 2;
     }
     setNumberOfRows(job, parseHumanLong(args[0]));
     Path outputDir = new Path(args[1]);
-    if (outputDir.getFileSystem(job).exists(outputDir)) {
+    if (outputDir.getFileSystem(getConf()).exists(outputDir)) {
       throw new IOException("Output directory " + outputDir + 
                             " already exists.");
     }
@@ -290,14 +297,13 @@
     job.setNumReduceTasks(0);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
-    job.setInputFormat(RangeInputFormat.class);
-    job.setOutputFormat(TeraOutputFormat.class);
-    JobClient.runJob(job);
-    return 0;
+    job.setInputFormatClass(RangeInputFormat.class);
+    job.setOutputFormatClass(TeraOutputFormat.class);
+    return job.waitForCompletion(true) ? 0 : 1;
   }
 
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new JobConf(), new TeraGen(), args);
+    int res = ToolRunner.run(new Configuration(), new TeraGen(), args);
     System.exit(res);
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java Tue Jan 26 14:02:53 2010
@@ -22,19 +22,21 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.LineRecordReader;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.QuickSort;
 import org.apache.hadoop.util.StringUtils;
@@ -47,13 +49,15 @@
 public class TeraInputFormat extends FileInputFormat<Text,Text> {
 
   static final String PARTITION_FILENAME = "_partition.lst";
-  private static final String NUM_PARTITIONS = "terasort.num.partitions";
-  private static final String SAMPLE_SIZE = "terasort.partitions.sample";
+  private static final String NUM_PARTITIONS = 
+    "mapreduce.terasort.num.partitions";
+  private static final String SAMPLE_SIZE = 
+    "mapreduce.terasort.partitions.sample";
   static final int KEY_LENGTH = 10;
   static final int VALUE_LENGTH = 90;
   static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
-  private static JobConf lastConf = null;
-  private static InputSplit[] lastResult = null;
+  private static JobContext lastContext = null;
+  private static List<InputSplit> lastResult = null;
 
   static class TeraFileSplit extends FileSplit {
     private String[] locations;
@@ -136,24 +140,25 @@
    * Use the input splits to take samples of the input and generate sample
    * keys. By default reads 100,000 keys from 10 locations in the input, sorts
    * them and picks N-1 keys to generate N equally sized partitions.
-   * @param conf the job to sample
+   * @param job the job to sample
    * @param partFile where to write the output file to
    * @throws IOException if something goes wrong
    */
-  public static void writePartitionFile(final JobConf conf, 
-                                        Path partFile) throws IOException {
+  public static void writePartitionFile(final JobContext job, 
+      Path partFile) throws IOException, InterruptedException  {
     long t1 = System.currentTimeMillis();
+    Configuration conf = job.getConfiguration();
     final TeraInputFormat inFormat = new TeraInputFormat();
     final TextSampler sampler = new TextSampler();
-    int partitions = conf.getNumReduceTasks();
+    int partitions = job.getNumReduceTasks();
     long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
-    final InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
+    final List<InputSplit> splits = inFormat.getSplits(job);
     long t2 = System.currentTimeMillis();
     System.out.println("Computing input splits took " + (t2 - t1) + "ms");
-    int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.length);
-    System.out.println("Sampling " + samples + " splits of " + splits.length);
+    int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size());
+    System.out.println("Sampling " + samples + " splits of " + splits.size());
     final long recordsPerSample = sampleSize / samples;
-    final int sampleStep = splits.length / samples;
+    final int sampleStep = splits.size() / samples;
     Thread[] samplerReader = new Thread[samples];
     // take N samples from different parts of the input
     for(int i=0; i < samples; ++i) {
@@ -164,14 +169,16 @@
           setDaemon(true);
         }
         public void run() {
-          Text key = new Text();
-          Text value = new Text();
           long records = 0;
           try {
-            RecordReader<Text,Text> reader = 
-              inFormat.getRecordReader(splits[sampleStep * idx], conf, null);
-            while (reader.next(key, value)) {
-              sampler.addKey(key);
+            TaskAttemptContext context = new TaskAttemptContextImpl(
+              job.getConfiguration(), new TaskAttemptID());
+            RecordReader<Text, Text> reader = 
+              inFormat.createRecordReader(splits.get(sampleStep * idx),
+              context);
+            reader.initialize(splits.get(sampleStep * idx), context);
+            while (reader.nextKeyValue()) {
+              sampler.addKey(new Text(reader.getCurrentKey()));
               records += 1;
               if (recordsPerSample <= records) {
                 break;
@@ -181,6 +188,8 @@
             System.err.println("Got an exception while reading splits " +
                 StringUtils.stringifyException(ie));
             System.exit(-1);
+          } catch (InterruptedException e) {
+        	  
           }
         }
       };
@@ -203,46 +212,47 @@
     System.out.println("Computing parititions took " + (t3 - t2) + "ms");
   }
 
-  static class TeraRecordReader implements RecordReader<Text,Text> {
+  static class TeraRecordReader extends RecordReader<Text,Text> {
     private FSDataInputStream in;
     private long offset;
     private long length;
     private static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
     private byte[] buffer = new byte[RECORD_LENGTH];
+    private Text key;
+    private Text value;
 
-    public TeraRecordReader(Configuration job, 
-                            FileSplit split) throws IOException {
-      Path p = split.getPath();
-      FileSystem fs = p.getFileSystem(job);
+    public TeraRecordReader() throws IOException {
+    }
+
+    public void initialize(InputSplit split, TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      Path p = ((FileSplit)split).getPath();
+      FileSystem fs = p.getFileSystem(context.getConfiguration());
       in = fs.open(p);
-      long start = split.getStart();
+      long start = ((FileSplit)split).getStart();
       // find the offset to start at a record boundary
       offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
       in.seek(start + offset);
-      length = split.getLength();
+      length = ((FileSplit)split).getLength();
     }
 
     public void close() throws IOException {
       in.close();
     }
 
-    public Text createKey() {
-      return new Text();
-    }
-
-    public Text createValue() {
-      return new Text();
+    public Text getCurrentKey() {
+      return key;
     }
 
-    public long getPos() throws IOException {
-      return in.getPos();
+    public Text getCurrentValue() {
+      return value;
     }
 
     public float getProgress() throws IOException {
       return (float) offset / length;
     }
 
-    public boolean next(Text key, Text value) throws IOException {
+    public boolean nextKeyValue() throws IOException {
       if (offset >= length) {
         return false;
       }
@@ -258,6 +268,12 @@
         }
         read += newRead;
       }
+      if (key == null) {
+        key = new Text();
+      }
+      if (value == null) {
+        value = new Text();
+      }
       key.set(buffer, 0, KEY_LENGTH);
       value.set(buffer, KEY_LENGTH, VALUE_LENGTH);
       offset += RECORD_LENGTH;
@@ -267,31 +283,30 @@
 
   @Override
   public RecordReader<Text, Text> 
-      getRecordReader(InputSplit split,
-                      JobConf job, 
-                      Reporter reporter) throws IOException {
-    return new TeraRecordReader(job, (FileSplit) split);
+      createRecordReader(InputSplit split, TaskAttemptContext context) 
+      throws IOException {
+    return new TeraRecordReader();
   }
 
-  @Override
   protected FileSplit makeSplit(Path file, long start, long length, 
                                 String[] hosts) {
     return new TeraFileSplit(file, start, length, hosts);
   }
 
   @Override
-  public InputSplit[] getSplits(JobConf conf, int splits) throws IOException {
-    if (conf == lastConf) {
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    if (job == lastContext) {
       return lastResult;
     }
     long t1, t2, t3;
     t1 = System.currentTimeMillis();
-    lastConf = conf;
-    lastResult = super.getSplits(conf, splits);
+    lastContext = job;
+    lastResult = super.getSplits(job);
     t2 = System.currentTimeMillis();
     System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
-    if (conf.getBoolean("terasort.use.terascheduler", true)) {
-      TeraScheduler scheduler = new TeraScheduler((FileSplit[]) lastResult, conf);
+    if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
+      TeraScheduler scheduler = new TeraScheduler(
+        lastResult.toArray(new TeraFileSplit[0]), job.getConfiguration());
       lastResult = scheduler.getNewFileSplits();
       t3 = System.currentTimeMillis(); 
       System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java Tue Jan 26 14:02:53 2010
@@ -24,46 +24,43 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InvalidJobConfException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 /**
  * An output format that writes the key and value appended together.
  */
 public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
-  static final String FINAL_SYNC_ATTRIBUTE = "terasort.final.sync";
+  static final String FINAL_SYNC_ATTRIBUTE = "mapreduce.terasort.final.sync";
+  private OutputCommitter committer = null;
 
   /**
    * Set the requirement for a final sync before the stream is closed.
    */
-  public static void setFinalSync(JobConf conf, boolean newValue) {
-    conf.setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
+  static void setFinalSync(JobContext job, boolean newValue) {
+    job.getConfiguration().setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
   }
 
   /**
    * Does the user want a final sync at close?
    */
-  public static boolean getFinalSync(JobConf conf) {
-    return conf.getBoolean(FINAL_SYNC_ATTRIBUTE, false);
+  public static boolean getFinalSync(JobContext job) {
+    return job.getConfiguration().getBoolean(FINAL_SYNC_ATTRIBUTE, false);
   }
 
-  static class TeraRecordWriter implements RecordWriter<Text,Text> {
+  static class TeraRecordWriter extends RecordWriter<Text,Text> {
     private boolean finalSync = false;
     private FSDataOutputStream out;
 
     public TeraRecordWriter(FSDataOutputStream out,
-                            JobConf conf) {
-      finalSync = getFinalSync(conf);
+                            JobContext job) {
+      finalSync = getFinalSync(job);
       this.out = out;
     }
 
@@ -73,7 +70,7 @@
       out.write(value.getBytes(), 0, value.getLength());
     }
     
-    public void close(Reporter reporter) throws IOException {
+    public void close(TaskAttemptContext context) throws IOException {
       if (finalSync) {
         out.sync();
       }
@@ -82,37 +79,41 @@
   }
 
   @Override
-  public void checkOutputSpecs(FileSystem ignored, 
-                               JobConf job
+  public void checkOutputSpecs(JobContext job
                               ) throws InvalidJobConfException, IOException {
-    // Ensure that the output directory is set and not already there
+    // Ensure that the output directory is set
     Path outDir = getOutputPath(job);
     if (outDir == null) {
       throw new InvalidJobConfException("Output directory not set in JobConf.");
     }
   }
 
-  public RecordWriter<Text,Text> getRecordWriter(FileSystem ignored,
-                                                 JobConf job,
-                                                 String name,
-                                                 Progressable progress
+  public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job
                                                  ) throws IOException {
-    Path dir = getWorkOutputPath(job);
-    FileSystem fs = dir.getFileSystem(job);
-    FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+    Path file = getDefaultWorkFile(job, "");
+    FileSystem fs = file.getFileSystem(job.getConfiguration());
+     FSDataOutputStream fileOut = fs.create(file);
     return new TeraRecordWriter(fileOut, job);
   }
   
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
+      throws IOException {
+    if (committer == null) {
+      Path output = getOutputPath(context);
+      committer = new TeraOutputCommitter(output, context);
+    }
+    return committer;
+  }
+
   public static class TeraOutputCommitter extends FileOutputCommitter {
 
-    @Override
-    public void commitJob(JobContext jobContext) {
+    public TeraOutputCommitter(Path outputPath, TaskAttemptContext context)
+        throws IOException {
+      super(outputPath, context);
     }
 
     @Override
-    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
-      return taskContext.getTaskAttemptID().getTaskID().getTaskType() ==
-               TaskType.REDUCE;
+    public void commitJob(JobContext jobContext) {
     }
 
     @Override

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java Tue Jan 26 14:02:53 2010
@@ -25,21 +25,24 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.terasort.TeraInputFormat.TeraFileSplit;
-import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 
 class TeraScheduler {
+  static String USE = "mapreduce.terasort.use.terascheduler";
   private static final Log LOG = LogFactory.getLog(TeraScheduler.class);
-  private InputSplit[] splits;
+  private Split[] splits;
   private List<Host> hosts = new ArrayList<Host>();
   private int slotsPerHost;
   private int remainingSplits = 0;
   private FileSplit[] realSplits = null;
 
-  static class InputSplit {
+  static class Split {
     String filename;
     boolean isAssigned = false;
     List<Host> locations = new ArrayList<Host>();
-    InputSplit(String filename) {
+    Split(String filename) {
       this.filename = filename;
     }
     public String toString() {
@@ -55,7 +58,7 @@
   }
   static class Host {
     String hostname;
-    List<InputSplit> splits = new ArrayList<InputSplit>();
+    List<Split> splits = new ArrayList<Split>();
     Host(String hostname) {
       this.hostname = hostname;
     }
@@ -92,11 +95,11 @@
     }
     // read the blocks
     List<String> splitLines = readFile(splitFilename);
-    splits = new InputSplit[splitLines.size()];
+    splits = new Split[splitLines.size()];
     remainingSplits = 0;
     for(String line: splitLines) {
       StringTokenizer itr = new StringTokenizer(line);
-      InputSplit newSplit = new InputSplit(itr.nextToken());
+      Split newSplit = new Split(itr.nextToken());
       splits[remainingSplits++] = newSplit;
       while (itr.hasMoreTokens()) {
         Host host = hostIds.get(itr.nextToken());
@@ -109,11 +112,11 @@
   public TeraScheduler(FileSplit[] realSplits,
                        Configuration conf) throws IOException {
     this.realSplits = realSplits;
-    this.slotsPerHost = conf.getInt("mapred.tasktracker.map.tasks.maximum", 4);
+    this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
     Map<String, Host> hostTable = new HashMap<String, Host>();
-    splits = new InputSplit[realSplits.length];
+    splits = new Split[realSplits.length];
     for(FileSplit realSplit: realSplits) {
-      InputSplit split = new InputSplit(realSplit.getPath().toString());
+      Split split = new Split(realSplit.getPath().toString());
       splits[remainingSplits++] = split;
       for(String hostname: realSplit.getLocations()) {
         Host host = hostTable.get(hostname);
@@ -148,8 +151,8 @@
     int tasksToPick = Math.min(slotsPerHost, 
                                (int) Math.ceil((double) remainingSplits / 
                                                hosts.size()));
-    InputSplit[] best = new InputSplit[tasksToPick];
-    for(InputSplit cur: host.splits) {
+    Split[] best = new Split[tasksToPick];
+    for(Split cur: host.splits) {
       LOG.debug("  examine: " + cur.filename + " " + cur.locations.size());
       int i = 0;
       while (i < tasksToPick && best[i] != null && 
@@ -177,7 +180,7 @@
       }
     }
     // for the non-chosen blocks, remove this host
-    for(InputSplit cur: host.splits) {
+    for(Split cur: host.splits) {
       if (!cur.isAssigned) {
         cur.locations.remove(host);
       }
@@ -200,7 +203,7 @@
    *    best host as the only host.
    * @throws IOException
    */
-  public FileSplit[] getNewFileSplits() throws IOException {
+  public List<InputSplit> getNewFileSplits() throws IOException {
     solve();
     FileSplit[] result = new FileSplit[realSplits.length];
     int left = 0;
@@ -215,7 +218,11 @@
         result[right--] = realSplits[i];
       }
     }
-    return result;
+    List<InputSplit> ret = new ArrayList<InputSplit>();
+    for (FileSplit fs : result) {
+      ret.add(fs);
+    }
+    return ret;
   }
 
   public static void main(String[] args) throws IOException {
@@ -225,7 +232,7 @@
     }
     LOG.info("starting solve");
     problem.solve();
-    List<InputSplit> leftOvers = new ArrayList<InputSplit>();
+    List<Split> leftOvers = new ArrayList<Split>();
     for(int i=0; i < problem.splits.length; ++i) {
       if (problem.splits[i].isAssigned) {
         System.out.println("sched: " + problem.splits[i]);        
@@ -233,7 +240,7 @@
         leftOvers.add(problem.splits[i]);
       }
     }
-    for(InputSplit cur: leftOvers) {
+    for(Split cur: leftOvers) {
       System.out.println("left: " + cur);
     }
     System.out.println("left over: " + leftOvers.size());

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java Tue Jan 26 14:02:53 2010
@@ -25,17 +25,17 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.examples.terasort.TeraOutputFormat.TeraOutputCommitter;
-import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -48,14 +48,18 @@
  */
 public class TeraSort extends Configured implements Tool {
   private static final Log LOG = LogFactory.getLog(TeraSort.class);
+  static String SIMPLE_PARTITIONER = "mapreduce.terasort.simplepartitioner";
+  static String OUTPUT_REPLICATION = "mapreduce.terasort.output.replication";
 
   /**
    * A partitioner that splits text keys into roughly equal partitions
    * in a global sorted order.
    */
-  static class TotalOrderPartitioner implements Partitioner<Text,Text>{
+  static class TotalOrderPartitioner extends Partitioner<Text,Text>
+      implements Configurable {
     private TrieNode trie;
     private Text[] splitPoints;
+    private Configuration conf;
 
     /**
      * A generic trie node
@@ -147,9 +151,9 @@
      * @return the strings to split the partitions on
      * @throws IOException
      */
-    private static Text[] readPartitions(FileSystem fs, Path p, 
-                                         JobConf job) throws IOException {
-      int reduces = job.getNumReduceTasks();
+    private static Text[] readPartitions(FileSystem fs, Path p,
+        Configuration conf) throws IOException {
+      int reduces = conf.getInt(JobContext.NUM_REDUCES, 1);
       Text[] result = new Text[reduces - 1];
       DataInputStream reader = fs.open(p);
       for(int i=0; i < reduces - 1; ++i) {
@@ -201,17 +205,22 @@
       return result;
     }
 
-    public void configure(JobConf job) {
+    public void setConf(Configuration conf) {
       try {
-        FileSystem fs = FileSystem.getLocal(job);
+        FileSystem fs = FileSystem.getLocal(conf);
+        this.conf = conf;
         Path partFile = new Path(TeraInputFormat.PARTITION_FILENAME);
-        splitPoints = readPartitions(fs, partFile, job);
+        splitPoints = readPartitions(fs, partFile, conf);
         trie = buildTrie(splitPoints, 0, splitPoints.length, new Text(), 2);
       } catch (IOException ie) {
         throw new IllegalArgumentException("can't read paritions file", ie);
       }
     }
 
+    public Configuration getConf() {
+      return conf;
+    }
+    
     public TotalOrderPartitioner() {
     }
 
@@ -225,13 +234,21 @@
    * A total order partitioner that assigns keys based on their first 
    * PREFIX_LENGTH bytes, assuming a flat distribution.
    */
-  public static class SimplePartitioner implements Partitioner<Text, Text>{
+  public static class SimplePartitioner extends Partitioner<Text, Text>
+      implements Configurable {
     int prefixesPerReduce;
     private static final int PREFIX_LENGTH = 3;
-    public void configure(JobConf job) {
+    private Configuration conf = null;
+    public void setConf(Configuration conf) {
+      this.conf = conf;
       prefixesPerReduce = (int) Math.ceil((1 << (8 * PREFIX_LENGTH)) / 
-                                          (float) job.getNumReduceTasks());
+        (float) conf.getInt(JobContext.NUM_REDUCES, 1));
     }
+    
+    public Configuration getConf() {
+      return conf;
+    }
+    
     @Override
     public int getPartition(Text key, Text value, int numPartitions) {
       byte[] bytes = key.getBytes();
@@ -244,35 +261,36 @@
     }
   }
 
-  public static boolean getUseSimplePartitioner(Configuration conf) {
-    return conf.getBoolean("terasort.partitioner.simple", false);
+  public static boolean getUseSimplePartitioner(JobContext job) {
+    return job.getConfiguration().getBoolean(SIMPLE_PARTITIONER, false);
   }
 
-  public static void setUseSimplePartitioner(Configuration conf,
-                                             boolean value) {
-    conf.setBoolean("terasort.partitioner.simple", value);
+  public static void setUseSimplePartitioner(Job job, boolean value) {
+    job.getConfiguration().setBoolean(SIMPLE_PARTITIONER, value);
+  }
+
+  public static int getOutputReplication(JobContext job) {
+    return job.getConfiguration().getInt(OUTPUT_REPLICATION, 1);
+  }
+
+  public static void setOutputReplication(Job job, int value) {
+    job.getConfiguration().setInt(OUTPUT_REPLICATION, value);
   }
 
   public int run(String[] args) throws Exception {
     LOG.info("starting");
-    JobConf job = (JobConf) getConf();
+    Job job = Job.getInstance(new Cluster(getConf()), getConf());
     Path inputDir = new Path(args[0]);
     Path outputDir = new Path(args[1]);
     boolean useSimplePartitioner = getUseSimplePartitioner(job);
-    FileSystem outputFileSystem = outputDir.getFileSystem(job);
-    outputDir = outputDir.makeQualified(outputFileSystem);
-    if (outputFileSystem.exists(outputDir)) {
-      throw new IOException("Output directory " + outputDir + 
-                            " already exists.");
-    }
     TeraInputFormat.setInputPaths(job, inputDir);
     FileOutputFormat.setOutputPath(job, outputDir);
     job.setJobName("TeraSort");
     job.setJarByClass(TeraSort.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
-    job.setInputFormat(TeraInputFormat.class);
-    job.setOutputFormat(TeraOutputFormat.class);
+    job.setInputFormatClass(TeraInputFormat.class);
+    job.setOutputFormatClass(TeraOutputFormat.class);
     if (useSimplePartitioner) {
       job.setPartitionerClass(SimplePartitioner.class);
     } else {
@@ -282,27 +300,25 @@
       URI partitionUri = new URI(partitionFile.toString() +
                                  "#" + TeraInputFormat.PARTITION_FILENAME);
       TeraInputFormat.writePartitionFile(job, partitionFile);
-      DistributedCache.addCacheFile(partitionUri, job);
-      DistributedCache.createSymlink(job);    
+      job.addCacheFile(partitionUri);
+      job.createSymlink();    
       long end = System.currentTimeMillis();
       System.out.println("Spent " + (end - start) + "ms computing partitions.");
       job.setPartitionerClass(TotalOrderPartitioner.class);
     }
-    job.setOutputCommitter(TeraOutputCommitter.class);
     
-    job.setInt("dfs.replication", 
-               job.getInt("terasort.output.replication", 1));
+    job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
     TeraOutputFormat.setFinalSync(job, true);
-    JobClient.runJob(job);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
     LOG.info("done");
-    return 0;
+    return ret;
   }
 
   /**
    * @param args
    */
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new JobConf(), new TeraSort(), args);
+    int res = ToolRunner.run(new Configuration(), new TeraSort(), args);
     System.exit(res);
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java Tue Jan 26 14:02:53 2010
@@ -19,23 +19,20 @@
 package org.apache.hadoop.examples.terasort;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.zip.Checksum;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -63,10 +60,8 @@
     return b.toString();
   }
 
-  static class ValidateMapper extends MapReduceBase 
-      implements Mapper<Text,Text,Text,Text> {
+  static class ValidateMapper extends Mapper<Text,Text,Text,Text> {
     private Text lastKey;
-    private OutputCollector<Text,Text> output;
     private String filename;
     private Unsigned16 checksum = new Unsigned16();
     private Unsigned16 tmp = new Unsigned16();
@@ -75,27 +70,22 @@
     /**
      * Get the final part of the input name
      * @param split the input split
-     * @return the "part-00000" for the input
+     * @return the "part-r-00000" for the input
      */
     private String getFilename(FileSplit split) {
       return split.getPath().getName();
     }
 
-    private int getPartition(FileSplit split) {
-      return Integer.parseInt(split.getPath().getName().substring(5));
-    }
-
-    public void map(Text key, Text value, OutputCollector<Text,Text> output,
-                    Reporter reporter) throws IOException {
+    public void map(Text key, Text value, Context context) 
+        throws IOException, InterruptedException {
       if (lastKey == null) {
-        FileSplit fs = (FileSplit) reporter.getInputSplit();
+        FileSplit fs = (FileSplit) context.getInputSplit();
         filename = getFilename(fs);
-        output.collect(new Text(filename + ":begin"), key);
+        context.write(new Text(filename + ":begin"), key);
         lastKey = new Text();
-        this.output = output;
       } else {
         if (key.compareTo(lastKey) < 0) {
-          output.collect(ERROR, new Text("misorder in " + filename + 
+          context.write(ERROR, new Text("misorder in " + filename + 
                                          " between " + textifyBytes(lastKey) + 
                                          " and " + textifyBytes(key)));
         }
@@ -109,10 +99,11 @@
       lastKey.set(key);
     }
     
-    public void close() throws IOException {
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException  {
       if (lastKey != null) {
-        output.collect(new Text(filename + ":end"), lastKey);
-        output.collect(CHECKSUM, new Text(checksum.toString()));
+        context.write(new Text(filename + ":end"), lastKey);
+        context.write(CHECKSUM, new Text(checksum.toString()));
       }
     }
   }
@@ -122,34 +113,31 @@
    * boundary keys are always increasing.
    * Also passes any error reports along intact.
    */
-  static class ValidateReducer extends MapReduceBase 
-      implements Reducer<Text,Text,Text,Text> {
+  static class ValidateReducer extends Reducer<Text,Text,Text,Text> {
     private boolean firstKey = true;
     private Text lastKey = new Text();
     private Text lastValue = new Text();
-    public void reduce(Text key, Iterator<Text> values,
-                       OutputCollector<Text, Text> output, 
-                       Reporter reporter) throws IOException {
+    public void reduce(Text key, Iterable<Text> values,
+        Context context) throws IOException, InterruptedException  {
       if (ERROR.equals(key)) {
-        while(values.hasNext()) {
-          output.collect(key, values.next());
+        for (Text val : values) {
+          context.write(key, val);
         }
       } else if (CHECKSUM.equals(key)) {
         Unsigned16 tmp = new Unsigned16();
         Unsigned16 sum = new Unsigned16();
-        while (values.hasNext()) {
-          String val = values.next().toString();
-          tmp.set(val);
+        for (Text val : values) {
+          tmp.set(val.toString());
           sum.add(tmp);
         }
-        output.collect(CHECKSUM, new Text(sum.toString()));
+        context.write(CHECKSUM, new Text(sum.toString()));
       } else {
-        Text value = values.next();
+        Text value = values.iterator().next();
         if (firstKey) {
           firstKey = false;
         } else {
           if (value.compareTo(lastValue) < 0) {
-            output.collect(ERROR, 
+            context.write(ERROR, 
                            new Text("bad key partitioning:\n  file " + 
                                     lastKey + " key " + 
                                     textifyBytes(lastValue) +
@@ -169,7 +157,7 @@
   }
 
   public int run(String[] args) throws Exception {
-    JobConf job = (JobConf) getConf();
+    Job job = Job.getInstance(new Cluster(getConf()), getConf());
     if (args.length != 2) {
       usage();
       return 1;
@@ -185,18 +173,16 @@
     // force a single reducer
     job.setNumReduceTasks(1);
     // force a single split 
-    job.setLong(org.apache.hadoop.mapreduce.lib.input.
-                FileInputFormat.SPLIT_MINSIZE, Long.MAX_VALUE);
-    job.setInputFormat(TeraInputFormat.class);
-    JobClient.runJob(job);
-    return 0;
+    FileInputFormat.setMinInputSplitSize(job, Long.MAX_VALUE);
+    job.setInputFormatClass(TeraInputFormat.class);
+    return job.waitForCompletion(true) ? 0 : 1;
   }
 
   /**
    * @param args
    */
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new JobConf(), new TeraValidate(), args);
+    int res = ToolRunner.run(new Configuration(), new TeraValidate(), args);
     System.exit(res);
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/compile
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/compile?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/compile (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/compile Tue Jan 26 14:02:53 2010
@@ -1,4 +1,16 @@
 #!/usr/bin/env bash
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
 
 export HADOOP_HOME=../../..
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/pyAbacus/compile
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/pyAbacus/compile?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/pyAbacus/compile (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/pyAbacus/compile Tue Jan 26 14:02:53 2010
@@ -1,4 +1,16 @@
 #!/usr/bin/env bash
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
 
 export HADOOP_HOME=../../../../..
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/pyAbacus/wordcountaggregator.spec
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/pyAbacus/wordcountaggregator.spec?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/pyAbacus/wordcountaggregator.spec (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/python/pyAbacus/wordcountaggregator.spec Tue Jan 26 14:02:53 2010
@@ -1,4 +1,21 @@
 <?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 
 <configuration>

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 26 14:02:53 2010
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/java:713112
 /hadoop/core/trunk/src/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/java:804974-885774
+/hadoop/mapreduce/trunk/src/java:804974-903221

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/mapred-default.xml?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/mapred-default.xml Tue Jan 26 14:02:53 2010
@@ -1,4 +1,20 @@
 <?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 
 <!-- Do not modify this file directly.  Instead, copy entries that you -->
@@ -175,14 +191,14 @@
 </property>
 
 <property>
-  <name>mapreduce.tasktracker.memorycalculatorplugin</name>
+  <name>mapreduce.tasktracker.resourcecalculatorplugin</name>
   <value></value>
   <description>
-   Name of the class whose instance will be used to query memory information
+   Name of the class whose instance will be used to query resource information
    on the tasktracker.
    
    The class must be an instance of 
-   org.apache.hadoop.util.MemoryCalculatorPlugin. If the value is null, the
+   org.apache.hadoop.util.ResourceCalculatorPlugin. If the value is null, the
    tasktracker attempts to use a class appropriate to the platform. 
    Currently, the only platform supported is Linux.
   </description>
@@ -248,6 +264,17 @@
   <description>The class responsible for scheduling the tasks.</description>
 </property>
 
+
+<property>
+  <name>mapreduce.job.split.metainfo.maxsize</name>
+  <value>10000000</value>
+  <description>The maximum permissible size of the split metainfo file. 
+  The JobTracker won't attempt to read split metainfo files bigger than
+  the configured value.
+  No limits if set to -1.
+  </description>
+</property>
+
 <property>
   <name>mapreduce.jobtracker.taskscheduler.maxrunningtasks.perjob</name>
   <value></value>
@@ -285,18 +312,17 @@
 <property>
   <name>mapreduce.reduce.shuffle.connect.timeout</name>
   <value>180000</value>
-  <description>Expert: Cluster-wide configuration. The maximum amount of
-  time (in milli seconds) reduce task spends in trying to connect to a
-  tasktracker for getting map output.
+  <description>Expert: The maximum amount of time (in milli seconds) reduce
+  task spends in trying to connect to a tasktracker for getting map output.
   </description>
 </property>
 
 <property>
   <name>mapreduce.reduce.shuffle.read.timeout</name>
-  <value>30000</value>
-  <description>Expert: Cluster-wide configuration. The maximum amount of time
-  (in milli seconds) reduce task waits for map output data to be available
-  for reading after obtaining connection.
+  <value>180000</value>
+  <description>Expert: The maximum amount of time (in milli seconds) reduce
+  task waits for map output data to be available for reading after obtaining
+  connection.
   </description>
 </property>
 
@@ -718,7 +744,7 @@
   </property>
 
   <property>
-    <name>mapreduce.client.progerssmonitor.pollinterval</name>
+    <name>mapreduce.client.progressmonitor.pollinterval</name>
     <value>1000</value>
     <description>The interval (in milliseconds) between which the JobClient
     reports status to the console and checks for job completion. You may want to set this

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Child.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Child.java Tue Jan 26 14:02:53 2010
@@ -26,21 +26,22 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.log4j.LogManager;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
 
 /** 
  * The main() for child processes. 
@@ -70,11 +71,12 @@
     JVMId jvmId = new JVMId(firstTaskid.getJobID(),
         firstTaskid.getTaskType() == TaskType.MAP,jvmIdInt);
     
-    // file name is passed thru env
+    //load token cache storage
     String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
-    FileSystem localFs = FileSystem.getLocal(defaultConf);
-    JobTokens jt = loadJobTokens(jobTokenFile, localFs);
-    LOG.debug("Child: got jobTokenfile=" + jobTokenFile);
+    defaultConf.set(JobContext.JOB_TOKEN_FILE, jobTokenFile);
+    TokenStorage ts = TokenCache.loadTaskTokenStorage(defaultConf);
+    LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() + 
+        "; from file=" + jobTokenFile);
     
     TaskUmbilicalProtocol umbilical =
       (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
@@ -153,8 +155,10 @@
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
         JobConf job = new JobConf(task.getJobFile());
         
+        // set job shuffle token
+        Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken();
         // set the jobTokenFile into task
-        task.setJobTokens(jt);
+        task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
         
         // setup the child's Configs.LOCAL_DIR. The child is now sandboxed and
         // can only see files down and under attemtdir only.
@@ -224,22 +228,4 @@
       LogManager.shutdown();
     }
   }
-  
-  /**
-   * load secret keys from a file
-   * @param jobTokenFile
-   * @param conf
-   * @throws IOException
-   */
-  private static JobTokens loadJobTokens(String jobTokenFile, FileSystem localFS) 
-  throws IOException {
-    Path localJobTokenFile = new Path (jobTokenFile);
-    FSDataInputStream in = localFS.open(localJobTokenFile);
-    JobTokens jt = new JobTokens();
-    jt.readFields(in);
-        
-    LOG.debug("Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath());
-    in.close();
-    return jt;
-  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/CleanupQueue.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/CleanupQueue.java Tue Jan 26 14:02:53 2010
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -38,7 +39,7 @@
    * paths(directories/files) in a separate thread. This constructor creates a
    * clean-up thread and also starts it as a daemon. Callers can instantiate one
    * CleanupQueue per JVM and can use it for deleting paths. Use
-   * {@link CleanupQueue#addToQueue(FileSystem, Path...)} to add paths for
+   * {@link CleanupQueue#addToQueue(PathDeletionContext...)} to add paths for
    * deletion.
    */
   public CleanupQueue() {
@@ -49,22 +50,61 @@
     }
   }
   
-  public void addToQueue(FileSystem fs, Path...paths) {
-    cleanupThread.addToQueue(fs, paths);
+  /**
+   * Contains info related to the path of the file/dir to be deleted
+   */
+  static class PathDeletionContext {
+    String fullPath;// full path of file or dir
+    FileSystem fs;
+
+    public PathDeletionContext(FileSystem fs, String fullPath) {
+      this.fs = fs;
+      this.fullPath = fullPath;
+    }
+    
+    protected String getPathForCleanup() {
+      return fullPath;
+    }
+
+    /**
+     * Makes the path(and its subdirectories recursively) fully deletable
+     */
+    protected void enablePathForCleanup() throws IOException {
+      // Do nothing by default.
+      // Subclasses can override to provide enabling for deletion.
+    }
   }
 
-  private static class PathCleanupThread extends Thread {
+  /**
+   * Adds the paths to the queue of paths to be deleted by cleanupThread.
+   */
+  void addToQueue(PathDeletionContext... contexts) {
+    cleanupThread.addToQueue(contexts);
+  }
 
-    static class PathAndFS {
-      FileSystem fs;
-      Path path;
-      PathAndFS(FileSystem fs, Path path) {
-        this.fs = fs;
-        this.path = path;
-      }
+  protected static boolean deletePath(PathDeletionContext context)
+            throws IOException {
+    context.enablePathForCleanup();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to delete " + context.fullPath);
+    }
+    if (context.fs.exists(new Path(context.fullPath))) {
+      return context.fs.delete(new Path(context.fullPath), true);
     }
+    return true;
+  }
+
+  // currently used by tests only
+  protected boolean isQueueEmpty() {
+    return (cleanupThread.queue.size() == 0);
+  }
+
+  private static class PathCleanupThread extends Thread {
+
     // cleanup queue which deletes files/directories of the paths queued up.
-    private LinkedBlockingQueue<PathAndFS> queue = new LinkedBlockingQueue<PathAndFS>();
+    private LinkedBlockingQueue<PathDeletionContext> queue =
+      new LinkedBlockingQueue<PathDeletionContext>();
 
     public PathCleanupThread() {
       setName("Directory/File cleanup thread");
@@ -72,27 +112,34 @@
       start();
     }
 
-    public void addToQueue(FileSystem fs, Path... paths) {
-      for (Path p : paths) {
+    void addToQueue(PathDeletionContext[] contexts) {
+      for (PathDeletionContext context : contexts) {
         try {
-          queue.put(new PathAndFS(fs, p));
-        } catch (InterruptedException ie) {}
+          queue.put(context);
+        } catch(InterruptedException ie) {}
       }
     }
 
     public void run() {
-      LOG.debug(getName() + " started.");
-      PathAndFS pathAndFS = null;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getName() + " started.");
+      }
+      PathDeletionContext context = null;
       while (true) {
         try {
-          pathAndFS = queue.take();
+          context = queue.take();
           // delete the path.
-          pathAndFS.fs.delete(pathAndFS.path, true);
-          LOG.debug("DELETED " + pathAndFS.path);
+          if (!deletePath(context)) {
+            LOG.warn("CleanupThread:Unable to delete path " + context.fullPath);
+          }
+          else if (LOG.isDebugEnabled()) {
+            LOG.debug("DELETED " + context.fullPath);
+          }
         } catch (InterruptedException t) {
+          LOG.warn("Interrupted deletion of " + context.fullPath);
           return;
         } catch (Exception e) {
-          LOG.warn("Error deleting path" + pathAndFS.path);
+          LOG.warn("Error deleting path " + context.fullPath + ": " + e);
         } 
       }
     }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Tue Jan 26 14:02:53 2010
@@ -21,6 +21,8 @@
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
@@ -29,6 +31,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 
 /**
  * The default implementation for controlling tasks.
@@ -119,14 +123,14 @@
     if (shexec != null) {
       if (Shell.WINDOWS) {
         //We don't do send kill process signal in case of windows as 
-        //already we have done a process.destroy() in termintateTaskJVM()
+        //already we have done a process.destroy() in terminateTaskJVM()
         return;
       }
       String pid = context.pid;
       if (pid != null) {
         if(ProcessTree.isSetsidAvailable) {
           ProcessTree.killProcessGroup(pid);
-        }else {
+        } else {
           ProcessTree.killProcess(pid);
         }
       }
@@ -134,8 +138,37 @@
   }
 
   @Override
-  public void initializeDistributedCache(InitializationContext context) {
-    // Do nothing.
+  void dumpTaskStack(TaskControllerContext context) {
+    ShellCommandExecutor shexec = context.shExec;
+    if (shexec != null) {
+      if (Shell.WINDOWS) {
+        // We don't use signals in Windows.
+        return;
+      }
+      String pid = context.pid;
+      if (pid != null) {
+        // Send SIGQUIT to get a stack dump
+        if (ProcessTree.isSetsidAvailable) {
+          ProcessTree.sigQuitProcessGroup(pid);
+        } else {
+          ProcessTree.sigQuitProcess(pid);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
+      throws IOException {
+    Path localizedUniqueDir = context.getLocalizedUniqueDir();
+    try {
+      // Setting recursive execute permission on localized dir
+      LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
+      FileUtil.chmod(localizedUniqueDir.toString(), "+x", true);
+    } catch (InterruptedException ie) {
+      LOG.warn("Exception in doing chmod on" + localizedUniqueDir, ie);
+      throw new IOException(ie);
+    }
   }
 
   @Override
@@ -157,4 +190,21 @@
           + exitCode + ".");
     }
   }
+
+  /**
+   * Enables the task for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableTaskForCleanup(PathDeletionContext context)
+         throws IOException {
+    try {
+      FileUtil.chmod(context.fullPath, "ug+rwx", true);
+    } catch(InterruptedException e) {
+      LOG.warn("Interrupted while setting permissions for " + context.fullPath +
+          " for deletion.");
+    } catch(IOException ioe) {
+      LOG.warn("Unable to change permissions of " + context.fullPath);
+    }
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Tue Jan 26 14:02:53 2010
@@ -65,8 +65,10 @@
    * Version 27: Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    * Version 28: Adding node health status to TaskStatus for MAPREDUCE-211
    * Version 29: Adding user name to the serialized Task for use by TT.
-   */
-  public static final long versionID = 29L;
+   * Version 30: Adding available memory and CPU usage information on TT to
+   *             TaskTrackerStatus for MAPREDUCE-1218
+   */             
+  public static final long versionID = 30L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java Tue Jan 26 14:02:53 2010
@@ -30,10 +30,9 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 
 /**
  * IsolationRunner is intended to facilitate debugging by re-running a specific
@@ -180,19 +179,21 @@
     Thread.currentThread().setContextClassLoader(classLoader);
     conf.setClassLoader(classLoader);
 
-    // split.dta file is used only by IsolationRunner. The file can now be in
-    // any of the configured local disks, so use LocalDirAllocator to find out
-    // where it is.
-    Path localSplit =
+    // split.dta/split.meta files are used only by IsolationRunner. 
+    // The file can now be in any of the configured local disks, 
+    // so use LocalDirAllocator to find out where it is.
+    Path localMetaSplit =
         new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
-            TaskTracker.getLocalSplitFile(conf.getUser(), taskId.getJobID()
-                .toString(), taskId.toString()), conf);
-    DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
-    String splitClass = Text.readString(splitFile);
-    BytesWritable split = new BytesWritable();
-    split.readFields(splitFile);
+            TaskTracker.getLocalSplitMetaFile(conf.getUser(), 
+              taskId.getJobID().toString(), taskId
+                .toString()), conf);
+    DataInputStream splitFile = FileSystem.getLocal(conf).open(localMetaSplit);
+    TaskSplitIndex splitIndex = new TaskSplitIndex(); 
+    splitIndex.readFields(splitFile);
     splitFile.close();
-    Task task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split, 1);
+
+    Task task = 
+      new MapTask(jobFilename.toString(), taskId, partition, splitIndex, 1);
     task.setConf(conf);
     task.run(conf, new FakeUmbilical());
     return true;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java Tue Jan 26 14:02:53 2010
@@ -476,6 +476,13 @@
   }
   
   /**
+   * Get a handle to the Cluster
+   */
+  public Cluster getClusterHandle() {
+    return cluster;
+  }
+  
+  /**
    * Submit a job to the MR system.
    * 
    * This returns a handle to the {@link RunningJob} which can be used to track
@@ -523,37 +530,6 @@
     }
   }
 
-  /** 
-   * Checks if the job directory is clean and has all the required components 
-   * for (re) starting the job
-   */
-  public static boolean isJobDirValid(Path jobDirPath, FileSystem fs) 
-  throws IOException {
-    FileStatus[] contents = null;
-    
-    try {
-      contents = fs.listStatus(jobDirPath);
-    } catch(FileNotFoundException fnfe) {
-      return false;
-    }
-    
-    int matchCount = 0;
-    if (contents.length >=2) {
-      for (FileStatus status : contents) {
-        if ("job.xml".equals(status.getPath().getName())) {
-          ++matchCount;
-        }
-        if ("job.split".equals(status.getPath().getName())) {
-          ++matchCount;
-        }
-      }
-      if (matchCount == 2) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   /**
    * Get an {@link RunningJob} object to track an ongoing job.  Returns
    * null if the id does not correspond to any known job.
@@ -1001,8 +977,12 @@
   
   public JobStatus[] getJobsFromQueue(String queueName) throws IOException {
     try {
+      QueueInfo queue = cluster.getQueue(queueName);
+      if (queue == null) {
+        return null;
+      }
       org.apache.hadoop.mapreduce.JobStatus[] stats = 
-        cluster.getQueue(queueName).getJobStatuses();
+        queue.getJobStatuses();
       JobStatus[] ret = new JobStatus[stats.length];
       for (int i = 0 ; i < stats.length; i++ ) {
         ret[i] = JobStatus.downgrade(stats[i]);
@@ -1022,7 +1002,11 @@
    */
   public JobQueueInfo getQueueInfo(String queueName) throws IOException {
     try {
-      return new JobQueueInfo(cluster.getQueue(queueName));
+      QueueInfo queueInfo = cluster.getQueue(queueName);
+      if (queueInfo != null) {
+        return new JobQueueInfo(queueInfo);
+      }
+      return null;
     } catch (InterruptedException ie) {
       throw new IOException(ie);
     }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java Tue Jan 26 14:02:53 2010
@@ -24,6 +24,7 @@
 import java.net.URL;
 import java.net.URLDecoder;
 import java.util.Enumeration;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -165,6 +166,10 @@
   static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
     JobContext.REDUCE_MEMORY_MB;
 
+  /** Pattern for the default unpacking behavior for job jars */
+  public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
+    Pattern.compile("(?:classes/|lib/).*");
+
   /**
    * Configuration key to set the java command line options for the child
    * map and reduce tasks.
@@ -418,6 +423,14 @@
    * @param jar the user jar for the map-reduce job.
    */
   public void setJar(String jar) { set(JobContext.JAR, jar); }
+
+  /**
+   * Get the pattern for jar contents to unpack on the tasktracker
+   */
+  public Pattern getJarUnpackPattern() {
+    return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
+  }
+
   
   /**
    * Set the job's jar file by finding an example class location.
@@ -435,6 +448,11 @@
     return getStrings(MRConfig.LOCAL_DIR);
   }
 
+  /**
+   * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
+   * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes()
+   */
+  @Deprecated
   public void deleteLocalFiles() throws IOException {
     String[] localDirs = getLocalDirs();
     for (int i = 0; i < localDirs.length; i++) {



Mime
View raw message