hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r726850 [4/4] - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/ src/core/org/apache/hadoop/util/ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/h...
Date Mon, 15 Dec 2008 22:21:35 GMT
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java Mon Dec 15 14:21:32 2008
@@ -35,6 +35,8 @@
   private long start;
   private long end;
   private boolean more = true;
+  private K key = null;
+  private V value = null;
   protected Configuration conf;
   
   @Override
@@ -58,23 +60,30 @@
 
   @Override
   @SuppressWarnings("unchecked")
-  public K nextKey(K key) throws IOException, InterruptedException {
+  public boolean nextKeyValue() throws IOException, InterruptedException {
     if (!more) {
-      return null;
+      return false;
     }
     long pos = in.getPosition();
-    K result = (K) in.next(key);
-    if (result == null || (pos >= end && in.syncSeen())) {
+    key = (K) in.next(key);
+    if (key == null || (pos >= end && in.syncSeen())) {
       more = false;
-      result = null;
+      key = null;
+      value = null;
+    } else {
+      value = (V) in.getCurrentValue(value);
     }
-    return result;
+    return more;
   }
 
   @Override
-  @SuppressWarnings("unchecked")
-  public V nextValue(V value) throws IOException, InterruptedException {
-    return (V) in.getCurrentValue(value);
+  public K getCurrentKey() {
+    return key;
+  }
+  
+  @Override
+  public V getCurrentValue() {
+    return value;
   }
   
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java Mon Dec 15 14:21:32 2008
@@ -26,9 +26,10 @@
 public class InverseMapper<K, V> extends Mapper<K,V,V,K> {
 
   /** The inverse function.  Input keys and values are swapped.*/
+  @Override
   public void map(K key, V value, Context context
                   ) throws IOException, InterruptedException {
-    context.collect(value, key);
+    context.write(value, key);
   }
   
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Mon Dec 15 14:21:32 2008
@@ -23,10 +23,16 @@
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Multithreaded implementation for @link org.apache.hadoop.mapreduce.Mapper.
@@ -50,7 +56,7 @@
   private static final Log LOG = LogFactory.getLog(MultithreadedMapper.class);
   private Class<Mapper<K1,V1,K2,V2>> mapClass;
   private Context outer;
-  private MapRunner[] runners;
+  private List<MapRunner> runners;
 
   public static int getNumberOfThreads(Configuration conf) {
     return conf.getInt("mapred.map.multithreadedrunner.threads", 10);
@@ -78,6 +84,7 @@
     conf.setClass("mapred.map.multithreadedrunner.class", cls, Mapper.class);
   }
 
+  @Override
   public void run(Context context) throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
     outer = context;
@@ -88,14 +95,16 @@
                 " threads");
     }
     
-    runners = (MapRunner[]) new Object[numberOfThreads];
+    runners =  new ArrayList<MapRunner>(numberOfThreads);
     for(int i=0; i < numberOfThreads; ++i) {
-      runners[i] = new MapRunner();
-      runners[i].start();
+      MapRunner thread = new MapRunner(context);
+      thread.start();
+      runners.set(i, thread);
     }
     for(int i=0; i < numberOfThreads; ++i) {
-      runners[i].join();
-      Throwable th = runners[i].throwable;
+      MapRunner thread = runners.get(i);
+      thread.join();
+      Throwable th = thread.throwable;
       if (th != null) {
         if (th instanceof IOException) {
           throw (IOException) th;
@@ -108,85 +117,116 @@
     }
   }
 
-  private class SubMapContext extends Context {
+  private class SubMapRecordReader extends RecordReader<K1,V1> {
     private K1 key;
     private V1 value;
-    
-    SubMapContext() {
-      super(outer.getConfiguration(), outer.getTaskAttemptId());
+    private Configuration conf;
+
+    @Override
+    public void close() throws IOException {
     }
 
     @Override
-    public InputSplit getInputSplit() {
-      synchronized (outer) {
-        return outer.getInputSplit();
-      }
+    public float getProgress() throws IOException, InterruptedException {
+      return 0;
     }
 
     @Override
-    public Counter getCounter(Enum<?> counterName) {
-      synchronized (outer) {
-        return outer.getCounter(counterName);
-      }
+    public void initialize(InputSplit split, 
+                           TaskAttemptContext context
+                           ) throws IOException, InterruptedException {
+      conf = context.getConfiguration();
     }
 
+
     @Override
-    public Counter getCounter(String groupName, String counterName) {
+    public boolean nextKeyValue() throws IOException, InterruptedException {
       synchronized (outer) {
-        return outer.getCounter(groupName, counterName);
+        if (!outer.nextKeyValue()) {
+          return false;
+        }
+        key = ReflectionUtils.copy(outer.getConfiguration(),
+                                   outer.getCurrentKey(), key);
+        value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
+        return true;
       }
     }
 
+    public K1 getCurrentKey() {
+      return key;
+    }
+
     @Override
-    public void progress() {
-      synchronized (outer) {
-        outer.progress();
-      }
+    public V1 getCurrentValue() {
+      return value;
     }
+  }
+  
+  private class SubMapRecordWriter extends RecordWriter<K2,V2> {
 
     @Override
-    public void collect(K2 key, V2 value) throws IOException,
-                                         InterruptedException {
-      synchronized (outer) {
-        outer.collect(key, value);
-      }
+    public void close(TaskAttemptContext context) throws IOException,
+                                                 InterruptedException {
     }
 
     @Override
-    public K1 nextKey(K1 k) throws IOException, InterruptedException {
+    public void write(K2 key, V2 value) throws IOException,
+                                               InterruptedException {
       synchronized (outer) {
-        key = outer.nextKey(key);
-        if (key != null) {
-          value = outer.nextValue(value);
-        }
-        return key;
+        outer.write(key, value);
       }
+    }  
+  }
+
+  private class SubMapStatusReporter extends StatusReporter {
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+      return outer.getCounter(name);
     }
-    
-    public V1 nextValue(V1 v) throws IOException, InterruptedException {
-      return value;
+
+    @Override
+    public Counter getCounter(String group, String name) {
+      return outer.getCounter(group, name);
     }
+
+    @Override
+    public void progress() {
+      outer.progress();
+    }
+
+    @Override
+    public void setStatus(String status) {
+      outer.setStatus(status);
+    }
+    
   }
 
   private class MapRunner extends Thread {
     private Mapper<K1,V1,K2,V2> mapper;
-    private Context context;
+    private Context subcontext;
     private Throwable throwable;
 
-    @SuppressWarnings("unchecked")
-    MapRunner() {
-      mapper = (Mapper<K1,V1,K2,V2>) 
-        ReflectionUtils.newInstance(mapClass, context.getConfiguration());
-      context = new SubMapContext();
+    MapRunner(Context context) throws IOException, InterruptedException {
+      mapper = ReflectionUtils.newInstance(mapClass, 
+                                           context.getConfiguration());
+      subcontext = new Context(outer.getConfiguration(), 
+                            outer.getTaskAttemptID(),
+                            new SubMapRecordReader(),
+                            new SubMapRecordWriter(), 
+                            context.getOutputCommitter(),
+                            new SubMapStatusReporter(),
+                            outer.getInputSplit());
     }
 
     public Throwable getThrowable() {
       return throwable;
     }
 
+    @Override
     public void run() {
       try {
-        mapper.run(context);
+        mapper.run(subcontext);
       } catch (Throwable ie) {
         throwable = ie;
       }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java Mon Dec 15 14:21:32 2008
@@ -29,13 +29,14 @@
     
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();
-    
+  
+  @Override
   public void map(Object key, Text value, Context context
                   ) throws IOException, InterruptedException {
     StringTokenizer itr = new StringTokenizer(value.toString());
     while (itr.hasMoreTokens()) {
       word.set(itr.nextToken());
-      context.collect(word, one);
+      context.write(word, one);
     }
   }
 }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=726850&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Mon Dec 15 14:21:32 2008
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.StringUtils;
+
+/** An {@link OutputCommitter} that commits files specified 
+ * in job output directory i.e. ${mapred.output.dir}. 
+ **/
+public class FileOutputCommitter extends OutputCommitter {
+
+  private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
+
+  /**
+   * Temporary directory name 
+   */
+  protected static final String TEMP_DIR_NAME = "_temporary";
+  private FileSystem outputFileSystem = null;
+  private Path outputPath = null;
+  private Path workPath = null;
+
+  public FileOutputCommitter(Path outputPath, 
+                             TaskAttemptContext context) throws IOException {
+    if (outputPath != null) {
+      this.outputPath = outputPath;
+      outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
+      workPath = new Path(outputPath,
+                          (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+                           "_" + context.getTaskAttemptID().toString()
+                           )).makeQualified(outputFileSystem);
+    }
+  }
+
+  public void setupJob(JobContext context) throws IOException {
+    if (outputPath != null) {
+      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
+      if (!fileSys.mkdirs(tmpDir)) {
+        LOG.error("Mkdirs failed to create " + tmpDir.toString());
+      }
+    }
+  }
+
+  public void cleanupJob(JobContext context) throws IOException {
+    if (outputPath != null) {
+      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
+      if (fileSys.exists(tmpDir)) {
+        fileSys.delete(tmpDir, true);
+      }
+    }
+  }
+
+  public void setupTask(TaskAttemptContext context) throws IOException {
+    // FileOutputCommitter's setupTask doesn't do anything. Because the
+    // temporary task directory is created on demand when the 
+    // task is writing.
+  }
+		  
+  public void commitTask(TaskAttemptContext context) 
+  throws IOException {
+    TaskAttemptID attemptId = context.getTaskAttemptID();
+    if (workPath != null) {
+      context.progress();
+      if (outputFileSystem.exists(workPath)) {
+        // Move the task outputs to their final place
+        moveTaskOutputs(context, outputFileSystem, outputPath, workPath);
+        // Delete the temporary task-specific output directory
+        if (!outputFileSystem.delete(workPath, true)) {
+          LOG.warn("Failed to delete the temporary output" + 
+          " directory of task: " + attemptId + " - " + workPath);
+        }
+        LOG.info("Saved output of task '" + attemptId + "' to " + 
+                 outputPath);
+      }
+    }
+  }
+		  
+  private void moveTaskOutputs(TaskAttemptContext context,
+                               FileSystem fs,
+                               Path jobOutputDir,
+                               Path taskOutput) 
+  throws IOException {
+    TaskAttemptID attemptId = context.getTaskAttemptID();
+    context.progress();
+    if (fs.isFile(taskOutput)) {
+      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
+                                          workPath);
+      if (!fs.rename(taskOutput, finalOutputPath)) {
+        if (!fs.delete(finalOutputPath, true)) {
+          throw new IOException("Failed to delete earlier output of task: " + 
+                                 attemptId);
+        }
+        if (!fs.rename(taskOutput, finalOutputPath)) {
+          throw new IOException("Failed to save output of task: " + 
+        		  attemptId);
+        }
+      }
+      LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
+    } else if(fs.getFileStatus(taskOutput).isDir()) {
+      FileStatus[] paths = fs.listStatus(taskOutput);
+      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
+      fs.mkdirs(finalOutputPath);
+      if (paths != null) {
+        for (FileStatus path : paths) {
+          moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
+        }
+      }
+    }
+  }
+
+  public void abortTask(TaskAttemptContext context) {
+    try {
+      context.progress();
+      outputFileSystem.delete(workPath, true);
+    } catch (IOException ie) {
+      LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
+    }
+  }
+
+  private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
+                            Path taskOutputPath) throws IOException {
+    URI taskOutputUri = taskOutput.toUri();
+    URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
+    if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
+      throw new IOException("Can not get the relative path: base = " + 
+          taskOutputPath + " child = " + taskOutput);
+    }
+    if (relativePath.getPath().length() > 0) {
+      return new Path(jobOutputDir, relativePath.getPath());
+    } else {
+      return jobOutputDir;
+    }
+  }
+
+  public boolean needsTaskCommit(TaskAttemptContext context
+                                 ) throws IOException {
+    return workPath != null && outputFileSystem.exists(workPath);
+  }
+
+  /**
+   * Get the directory that the task should write results into
+   * @return the work directory
+   * @throws IOException
+   */
+  public Path getWorkPath() throws IOException {
+    return workPath;
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Mon Dec 15 14:21:32 2008
@@ -27,22 +27,34 @@
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 /** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
 public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
 
-  private static final String TEMP_DIR_NAME = "_temp";
+  /** Construct output file names so that, when an output directory listing is
+   * sorted lexicographically, positions correspond to output partitions.*/
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+  private FileOutputCommitter committer = null;
+
   /**
    * Set whether the output of the job is compressed.
-   * @param conf the {@link Configuration} to modify
+   * @param job the job to modify
    * @param compress should the output of the job be compressed?
    */
-  public static void setCompressOutput(Configuration conf, boolean compress) {
-    conf.setBoolean("mapred.output.compress", compress);
+  public static void setCompressOutput(Job job, boolean compress) {
+    job.getConfiguration().setBoolean("mapred.output.compress", compress);
   }
   
   /**
@@ -57,16 +69,17 @@
   
   /**
    * Set the {@link CompressionCodec} to be used to compress job outputs.
-   * @param conf the {@link Configuration} to modify
+   * @param job the job to modify
    * @param codecClass the {@link CompressionCodec} to be used to
    *                   compress the job outputs
    */
   public static void 
-  setOutputCompressorClass(Configuration conf, 
+  setOutputCompressorClass(Job job, 
                            Class<? extends CompressionCodec> codecClass) {
-    setCompressOutput(conf, true);
-    conf.setClass("mapred.output.compression.codec", codecClass, 
-                  CompressionCodec.class);
+    setCompressOutput(job, true);
+    job.getConfiguration().setClass("mapred.output.compression.codec", 
+                                    codecClass, 
+                                    CompressionCodec.class);
   }
   
   /**
@@ -95,20 +108,19 @@
     return codecClass;
   }
   
-  public abstract 
-    RecordWriter<K, V> getRecordWriter(TaskAttemptContext context
-                                       ) throws IOException;
-
-  public void checkOutputSpecs(JobContext context) 
-    throws FileAlreadyExistsException, 
-           InvalidJobConfException, IOException {
+  public abstract RecordWriter<K, V> 
+     getRecordWriter(TaskAttemptContext context
+                     ) throws IOException, InterruptedException;
+
+  public void checkOutputSpecs(JobContext context
+                               ) throws FileAlreadyExistsException, IOException{
     // Ensure that the output directory is set and not already there
     Configuration job = context.getConfiguration();
     Path outDir = getOutputPath(job);
-    if (outDir == null && context.getNumReduceTasks() != 0) {
-      throw new InvalidJobConfException("Output directory not set in JobConf.");
+    if (outDir == null) {
+      throw new InvalidJobConfException("Output directory not set.");
     }
-    if (outDir != null && outDir.getFileSystem(job).exists(outDir)) {
+    if (outDir.getFileSystem(job).exists(outDir)) {
       throw new FileAlreadyExistsException("Output directory " + outDir + 
                                            " already exists");
     }
@@ -117,19 +129,19 @@
   /**
    * Set the {@link Path} of the output directory for the map-reduce job.
    *
-   * @param conf The configuration of the job.
+   * @param job The job to modify
    * @param outputDir the {@link Path} of the output directory for 
    * the map-reduce job.
    */
-  public static void setOutputPath(Configuration conf, Path outputDir) {
-    conf.set("mapred.output.dir", outputDir.toString());
+  public static void setOutputPath(Job job, Path outputDir) {
+    job.getConfiguration().set("mapred.output.dir", outputDir.toString());
   }
 
   /**
    * Get the {@link Path} to the output directory for the map-reduce job.
    * 
    * @return the {@link Path} to the output directory for the map-reduce job.
-   * @see FileOutputFormat#getWorkOutputPath(Configuration)
+   * @see FileOutputFormat#getWorkOutputPath(TaskInputOutputContext)
    */
   public static Path getOutputPath(Configuration conf) {
     String name = conf.get("mapred.output.dir");
@@ -162,18 +174,12 @@
    * is completely transparent to the application.</p>
    * 
    * <p>The application-writer can take advantage of this by creating any 
-   * side-files required in <tt>${mapred.work.output.dir}</tt> during execution 
-   * of his reduce-task i.e. via {@link #getWorkOutputPath(Configuration)}, and
+   * side-files required in a work directory during execution 
+   * of his task i.e. via 
+   * {@link #getWorkOutputPath(TaskInputOutputContext)}, and
    * the framework will move them out similarly - thus she doesn't have to pick 
    * unique paths per task-attempt.</p>
    * 
-   * <p><i>Note</i>: the value of <tt>${mapred.work.output.dir}</tt> during 
-   * execution of a particular task-attempt is actually 
-   * <tt>${mapred.output.dir}/_temporary/_{$taskid}</tt>, and this value is 
-   * set by the map-reduce framework. So, just create any side-files in the 
-   * path  returned by {@link #getWorkOutputPath(Configuration)} from map/reduce 
-   * task to take advantage of this feature.</p>
-   * 
    * <p>The entire discussion holds true for maps of jobs with 
    * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, 
    * goes directly to HDFS.</p> 
@@ -181,77 +187,12 @@
    * @return the {@link Path} to the task's temporary output directory 
    * for the map-reduce job.
    */
-  public static Path getWorkOutputPath(Configuration conf) {
-    String name = conf.get("mapred.work.output.dir");
-    return name == null ? null: new Path(name);
-  }
-
-  /**
-   * Helper function to create the task's temporary output directory and 
-   * return the path to the task's output file.
-   * 
-   * @param context the task's context
-   * @return path to the task's temporary output file
-   * @throws IOException
-   */
-  protected static Path getTaskOutputPath(TaskAttemptContext context
-                                          ) throws IOException {
-    // ${mapred.job.dir}
-    Configuration conf = context.getConfiguration();
-    Path outputPath = getOutputPath(conf);
-    if (outputPath == null) {
-      throw new IOException("Undefined job output-path");
-    }
-
-    // ${mapred.out.dir}/_temporary
-    Path jobTmpDir = new Path(outputPath, TEMP_DIR_NAME);
-    FileSystem fs = jobTmpDir.getFileSystem(conf);
-    if (!fs.exists(jobTmpDir)) {
-      throw new IOException("The temporary job-output directory " + 
-          jobTmpDir.toString() + " doesn't exist!"); 
-    }
-
-    // ${mapred.out.dir}/_temporary/_${taskid}
-    Path taskTmpDir = getWorkOutputPath(conf);
-    if (!fs.mkdirs(taskTmpDir)) {
-      throw new IOException("Mkdirs failed to create " 
-          + taskTmpDir.toString());
-    }
-    
-    // ${mapred.out.dir}/_temporary/_${taskid}/${name}
-    return new Path(taskTmpDir, getOutputName(context));
-  } 
-
-  /**
-   * Helper function to generate a name that is unique for the task.
-   *
-   * <p>The generated name can be used to create custom files from within the
-   * different tasks for the job, the names for different tasks will not collide
-   * with each other.</p>
-   *
-   * <p>The given name is postfixed with the task type, 'm' for maps, 'r' for
-   * reduces and the task partition number. For example, give a name 'test'
-   * running on the first map o the job the generated name will be
-   * 'test-m-00000'.</p>
-   *
-   * @param conf the configuration for the job.
-   * @param name the name to make unique.
-   * @return a unique name accross all tasks of the job.
-   */
-  public static String getUniqueName(Configuration conf, String name) {
-    int partition = conf.getInt("mapred.task.partition", -1);
-    if (partition == -1) {
-      throw new IllegalArgumentException(
-        "This method can only be called from within a Job");
-    }
-
-    String taskType = (conf.getBoolean("mapred.task.is.map", true)) ? "m" : "r";
-
-    NumberFormat numberFormat = NumberFormat.getInstance();
-    numberFormat.setMinimumIntegerDigits(5);
-    numberFormat.setGroupingUsed(false);
-
-    return name + "-" + taskType + "-" + numberFormat.format(partition);
+  public static Path getWorkOutputPath(TaskInputOutputContext<?,?,?,?> context
+                                       ) throws IOException, 
+                                                InterruptedException {
+    FileOutputCommitter committer = (FileOutputCommitter) 
+      context.getOutputCommitter();
+    return committer.getWorkPath();
   }
 
   /**
@@ -262,28 +203,68 @@
    * reduce tasks. The path name will be unique for each task. The path parent
    * will be the job output directory.</p>ls
    *
-   * <p>This method uses the {@link #getUniqueName} method to make the file name
+   * <p>This method uses the {@link #getUniqueFile} method to make the file name
    * unique for the task.</p>
    *
-   * @param conf the configuration for the job.
+   * @param context the context for the task.
    * @param name the name for the file.
+   * @param extension the extension for the file
    * @return a unique path accross all tasks of the job.
    */
-  public static Path getPathForCustomFile(Configuration conf, String name) {
-    return new Path(getWorkOutputPath(conf), getUniqueName(conf, name));
+  public 
+  static Path getPathForWorkFile(TaskInputOutputContext<?,?,?,?> context, 
+                                 String name,
+                                 String extension
+                                ) throws IOException, InterruptedException {
+    return new Path(getWorkOutputPath(context),
+                    getUniqueFile(context, name, extension));
   }
 
-  /** Construct output file names so that, when an output directory listing is
-   * sorted lexicographically, positions correspond to output partitions.*/
-  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
-  static {
-    NUMBER_FORMAT.setMinimumIntegerDigits(5);
-    NUMBER_FORMAT.setGroupingUsed(false);
+  /**
+   * Generate a unique filename, based on the task id, name, and extension
+   * @param context the task that is calling this
+   * @param name the base filename
+   * @param extension the filename extension
+   * @return a string like $name-[mr]-$id$extension
+   */
+  public synchronized static String getUniqueFile(TaskAttemptContext context,
+                                                  String name,
+                                                  String extension) {
+    TaskID taskId = context.getTaskAttemptID().getTaskID();
+    int partition = taskId.getId();
+    StringBuilder result = new StringBuilder();
+    result.append(name);
+    result.append('-');
+    result.append(taskId.isMap() ? 'm' : 'r');
+    result.append('-');
+    result.append(NUMBER_FORMAT.format(partition));
+    result.append(extension);
+    return result.toString();
   }
 
-  protected static synchronized 
-  String getOutputName(TaskAttemptContext context) {
-    return "part-" + NUMBER_FORMAT.format(context.getTaskAttemptId().getId());
+  /**
+   * Get the default path and filename for the output format.
+   * @param context the task context
+   * @param extension an extension to add to the filename
+   * @return a full path $output/_temporary/$taskid/part-[mr]-$id
+   * @throws IOException
+   */
+  public Path getDefaultWorkFile(TaskAttemptContext context,
+                                 String extension) throws IOException{
+    FileOutputCommitter committer = 
+      (FileOutputCommitter) getOutputCommitter(context);
+    return new Path(committer.getWorkPath(), getUniqueFile(context, "part", 
+                                                           extension));
+  }
+
+  public synchronized 
+     OutputCommitter getOutputCommitter(TaskAttemptContext context
+                                        ) throws IOException {
+    if (committer == null) {
+      Path output = getOutputPath(context.getConfiguration());
+      committer = new FileOutputCommitter(output, context);
+    }
+    return committer;
   }
 }
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java Mon Dec 15 14:21:32 2008
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.lib.output;
 
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -28,12 +29,29 @@
  */
 public class NullOutputFormat<K, V> extends OutputFormat<K, V> {
   
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) {
+  @Override
+  public RecordWriter<K, V> 
+         getRecordWriter(TaskAttemptContext context) {
     return new RecordWriter<K, V>(){
         public void write(K key, V value) { }
         public void close(TaskAttemptContext context) { }
       };
   }
   
+  @Override
   public void checkOutputSpecs(JobContext context) { }
+  
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+    return new OutputCommitter() {
+      public void abortTask(TaskAttemptContext taskContext) { }
+      public void cleanupJob(JobContext jobContext) { }
+      public void commitTask(TaskAttemptContext taskContext) { }
+      public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+        return false;
+      }
+      public void setupJob(JobContext jobContext) { }
+      public void setupTask(TaskAttemptContext taskContext) { }
+    };
+  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java Mon Dec 15 14:21:32 2008
@@ -27,6 +27,7 @@
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -36,13 +37,11 @@
 /** An {@link OutputFormat} that writes {@link SequenceFile}s. */
 public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
 
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-    throws IOException {
-    // get the path of the temporary output file 
-    Path file = FileOutputFormat.getTaskOutputPath(context);
+  public RecordWriter<K, V> 
+         getRecordWriter(TaskAttemptContext context
+                         ) throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
     
-    FileSystem fs = file.getFileSystem(conf);
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
     if (getCompressOutput(conf)) {
@@ -54,6 +53,9 @@
       codec = (CompressionCodec) 
         ReflectionUtils.newInstance(codecClass, conf);
     }
+    // get the path of the temporary output file 
+    Path file = getDefaultWorkFile(context, "");
+    FileSystem fs = file.getFileSystem(conf);
     final SequenceFile.Writer out = 
       SequenceFile.createWriter(fs, conf, file,
                                 context.getOutputKeyClass(),
@@ -90,14 +92,15 @@
   
   /**
    * Set the {@link CompressionType} for the output {@link SequenceFile}.
-   * @param conf the {@link Configuration} to modify
+   * @param job the {@link Job} to modify
    * @param style the {@link CompressionType} for the output
    *              {@link SequenceFile} 
    */
-  public static void setOutputCompressionType(Configuration conf, 
+  public static void setOutputCompressionType(Job job, 
 		                                          CompressionType style) {
-    setCompressOutput(conf, true);
-    conf.set("mapred.output.compression.type", style.toString());
+    setCompressOutput(job, true);
+    job.getConfiguration().set("mapred.output.compression.type", 
+                               style.toString());
   }
 
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java Mon Dec 15 14:21:32 2008
@@ -38,9 +38,8 @@
 
 /** An {@link OutputFormat} that writes plain text files. */
 public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
-
   protected static class LineRecordWriter<K, V>
-    implements RecordWriter<K, V> {
+    extends RecordWriter<K, V> {
     private static final String utf8 = "UTF-8";
     private static final byte[] newline;
     static {
@@ -108,26 +107,26 @@
     }
   }
 
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-    throws IOException {
+  public RecordWriter<K, V> 
+         getRecordWriter(TaskAttemptContext context
+                         ) throws IOException, InterruptedException {
     Configuration job = context.getConfiguration();
     boolean isCompressed = getCompressOutput(job);
-    String keyValueSeparator = job.get("mapred.textoutputformat.separator", 
-                                       "\t");
-    Path file = FileOutputFormat.getTaskOutputPath(context);
+    String keyValueSeparator= job.get("mapred.textoutputformat.separator","\t");
+    CompressionCodec codec = null;
+    String extension = "";
+    if (isCompressed) {
+      Class<? extends CompressionCodec> codecClass = 
+        getOutputCompressorClass(job, GzipCodec.class);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job);
+      extension = codec.getDefaultExtension();
+    }
+    Path file = getDefaultWorkFile(context, extension);
+    FileSystem fs = file.getFileSystem(job);
     if (!isCompressed) {
-      FileSystem fs = file.getFileSystem(job);
       FSDataOutputStream fileOut = fs.create(file, context);
       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
     } else {
-      Class<? extends CompressionCodec> codecClass =
-        getOutputCompressorClass(job, GzipCodec.class);
-      // create the named codec
-      CompressionCodec codec = (CompressionCodec)
-        ReflectionUtils.newInstance(codecClass, job);
-      // build the filename including the extension
-      file = new Path(file + codec.getDefaultExtension());
-      FileSystem fs = file.getFileSystem(job);
       FSDataOutputStream fileOut = fs.create(file, context);
       return new LineRecordWriter<K, V>(new DataOutputStream
                                         (codec.createOutputStream(fileOut)),

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java Mon Dec 15 14:21:32 2008
@@ -34,7 +34,7 @@
       sum += val.get();
     }
     result.set(sum);
-    context.collect(key, result);
+    context.write(key, result);
   }
 
 }
\ No newline at end of file

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java Mon Dec 15 14:21:32 2008
@@ -34,7 +34,7 @@
       sum += val.get();
     }
     result.set(sum);
-    context.collect(key, result);
+    context.write(key, result);
   }
 
 }
\ No newline at end of file

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java Mon Dec 15 14:21:32 2008
@@ -25,7 +25,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.examples.WordCount;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java Mon Dec 15 14:21:32 2008
@@ -37,9 +37,9 @@
     JobConf job = new JobConf();
     job.set("mapred.task.id", attempt);
     job.setOutputCommitter(FileOutputCommitter.class);
-    JobContext jContext = new JobContext(job);
-    TaskAttemptContext tContext = new TaskAttemptContext(job, taskID);
     FileOutputFormat.setOutputPath(job, outDir);
+    JobContext jContext = new JobContext(job, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContext(job, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
     FileOutputFormat.setWorkOutputPath(job, 
       committer.getTempTaskOutputPath(tContext));

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java Mon Dec 15 14:21:32 2008
@@ -26,7 +26,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.examples.WordCount;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillCompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillCompletedJob.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillCompletedJob.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillCompletedJob.java Mon Dec 15 14:21:32 2008
@@ -28,7 +28,6 @@
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 
-import org.apache.hadoop.examples.WordCount;
 
 
 /**

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Dec 15 14:21:32 2008
@@ -34,7 +34,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.examples.WordCount;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java Mon Dec 15 14:21:32 2008
@@ -27,7 +27,6 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.WordCount;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Mon Dec 15 14:21:32 2008
@@ -108,7 +108,7 @@
 
     for (TaskCompletionEvent tce : taskComplEvents) {
       String[] diagnostics =
-          jClient.jobSubmitClient.getTaskDiagnostics(tce.getTaskAttemptId());
+          rj.getTaskDiagnostics(tce.getTaskAttemptId());
 
       if (diagnostics != null) {
         for (String str : diagnostics) {
@@ -304,7 +304,7 @@
           .getTaskStatus() == TaskCompletionEvent.Status.FAILED);
 
       String[] diagnostics =
-          jClient.jobSubmitClient.getTaskDiagnostics(tce.getTaskAttemptId());
+          rj.getTaskDiagnostics(tce.getTaskAttemptId());
 
       // Every task HAS to spit out the out-of-memory errors
       assert (diagnostics != null);

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/WordCount.java?rev=726850&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/WordCount.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/WordCount.java Mon Dec 15 14:21:32 2008
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * This is an example Hadoop Map/Reduce application.
+ * It reads the text input files, breaks each line into words
+ * and counts them. The output is a locally sorted list of words and the 
+ * count of how often they occurred.
+ *
+ * To run: bin/hadoop jar build/hadoop-examples.jar wordcount
+ *            [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i> 
+ */
+public class WordCount extends Configured implements Tool {
+  
+  /**
+   * Counts the words in each line.
+   * For each line of input, break the line into words and emit them as
+   * (<b>word</b>, <b>1</b>).
+   */
+  public static class MapClass extends MapReduceBase
+    implements Mapper<LongWritable, Text, Text, IntWritable> {
+    
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+    
+    public void map(LongWritable key, Text value, 
+                    OutputCollector<Text, IntWritable> output, 
+                    Reporter reporter) throws IOException {
+      String line = value.toString();
+      StringTokenizer itr = new StringTokenizer(line);
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        output.collect(word, one);
+      }
+    }
+  }
+  
+  /**
+   * A reducer class that just emits the sum of the input values.
+   */
+  public static class Reduce extends MapReduceBase
+    implements Reducer<Text, IntWritable, Text, IntWritable> {
+    
+    public void reduce(Text key, Iterator<IntWritable> values,
+                       OutputCollector<Text, IntWritable> output, 
+                       Reporter reporter) throws IOException {
+      int sum = 0;
+      while (values.hasNext()) {
+        sum += values.next().get();
+      }
+      output.collect(key, new IntWritable(sum));
+    }
+  }
+  
+  static int printUsage() {
+    System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+  
+  /**
+   * The main driver for word count map/reduce program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the 
+   *                     job tracker.
+   */
+  public int run(String[] args) throws Exception {
+    JobConf conf = new JobConf(getConf(), WordCount.class);
+    conf.setJobName("wordcount");
+ 
+    // the keys are words (strings)
+    conf.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
+    
+    conf.setMapperClass(MapClass.class);        
+    conf.setCombinerClass(Reduce.class);
+    conf.setReducerClass(Reduce.class);
+    
+    List<String> other_args = new ArrayList<String>();
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-m".equals(args[i])) {
+          conf.setNumMapTasks(Integer.parseInt(args[++i]));
+        } else if ("-r".equals(args[i])) {
+          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
+        } else {
+          other_args.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        return printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.out.println("ERROR: Required parameter missing from " +
+                           args[i-1]);
+        return printUsage();
+      }
+    }
+    // Make sure there are exactly 2 parameters left.
+    if (other_args.size() != 2) {
+      System.out.println("ERROR: Wrong number of parameters: " +
+                         other_args.size() + " instead of 2.");
+      return printUsage();
+    }
+    FileInputFormat.setInputPaths(conf, other_args.get(0));
+    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
+        
+    JobClient.runJob(conf);
+    return 0;
+  }
+  
+  
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new WordCount(), args);
+    System.exit(res);
+  }
+
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=726850&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Mon Dec 15 14:21:32 2008
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.WordCount;
+import org.apache.hadoop.examples.WordCount.IntSumReducer;
+import org.apache.hadoop.examples.WordCount.TokenizerMapper;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * A JUnit test to test min map-reduce cluster with local file system.
+ */
+public class TestMapReduceLocal extends TestCase {
+  private static Path TEST_ROOT_DIR =
+    new Path(System.getProperty("test.build.data","/tmp"));
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  static {
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  public Path writeFile(String name, String data) throws IOException {
+    Path file = new Path(TEST_ROOT_DIR + "/" + name);
+    localFs.delete(file, false);
+    DataOutputStream f = localFs.create(file);
+    f.write(data.getBytes());
+    f.close();
+    return file;
+  }
+
+  public String readFile(String name) throws IOException {
+    DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
+    BufferedReader b = new BufferedReader(new InputStreamReader(f));
+    StringBuilder result = new StringBuilder();
+    String line = b.readLine();
+    while (line != null) {
+     result.append(line);
+     result.append('\n');
+     line = b.readLine();
+    }
+    return result.toString();
+  }
+
+  public void testWithLocal() throws Exception {
+    MiniMRCluster mr = null;
+    try {
+      mr = new MiniMRCluster(2, "file:///", 3);
+      Configuration conf = mr.createJobConf();
+      writeFile("in/part1", "this is a test\nof word count\n");
+      writeFile("in/part2", "more test");
+      Job job = new Job(conf, "word count");     
+      job.setJarByClass(WordCount.class);
+      job.setMapperClass(TokenizerMapper.class);
+      job.setCombinerClass(IntSumReducer.class);
+      job.setReducerClass(IntSumReducer.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(IntWritable.class);
+      FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+      FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
+      assertTrue(job.waitForCompletion());
+      String out = readFile("out/part-r-00000");
+      System.out.println(out);
+      assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t2\nthis\t1\nword\t1\n",
+                   out);
+    } finally {
+      if (mr != null) { mr.shutdown(); }
+    }
+  }
+  
+}

Modified: hadoop/core/trunk/src/test/testjar/ClassWordCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/testjar/ClassWordCount.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/testjar/ClassWordCount.java (original)
+++ hadoop/core/trunk/src/test/testjar/ClassWordCount.java Mon Dec 15 14:21:32 2008
@@ -34,7 +34,7 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.examples.WordCount;
+import org.apache.hadoop.mapred.WordCount;
 
 /**
  * This is an example Hadoop Map/Reduce application being used for 



Mime
View raw message