hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r588033 [2/3] - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/conf/ src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/...
Date Wed, 24 Oct 2007 21:18:53 GMT
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Wed Oct 24 14:18:51 2007
@@ -31,6 +31,8 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
@@ -43,11 +45,63 @@
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
 
-/** A map/reduce job configuration.  This names the {@link Mapper}, combiner
- * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat}, and
- * {@link OutputFormat} implementations to be used.  It also indicates the set
- * of input files, and where the output files should be written. */
+/** 
+ * A map/reduce job configuration.
+ * 
+ * <p><code>JobConf</code> is the primary interface for a user to describe a 
+ * map-reduce job to the Hadoop framework for execution. The framework tries to
+ * faithfully execute the job as-is described by <code>JobConf</code>, however:
+ * <ol>
+ *   <li>
+ *   Some configuration parameters might have been marked as 
+ *   <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
+ *   final</a> by administrators and hence cannot be altered.
+ *   </li>
+ *   <li>
+ *   While some job parameters are straight-forward to set 
+ *   (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 
+ *   rest of the framework and/or job-configuration and is relatively more 
+ *   complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}).
+ *   </li>
+ * </ol></p>
+ * 
+ * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 
+ * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 
+ * {@link OutputFormat} implementations to be used etc. It also indicates the 
+ * set of input files ({@link #setInputPath(Path)}/{@link #addInputPath(Path)}), 
+ * and where the output files should be written ({@link #setOutputPath(Path)}).
+ *
+ * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 
+ * of the job such as <code>Comparator</code>s to be used, files to be put in  
+ * the {@link DistributedCache}, whether or not intermediate and/or job outputs 
+ * are to be compressed (and how) etc.</p>
+ * 
+ * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
+ * <p><blockquote><pre>
+ *     // Create a new JobConf
+ *     JobConf job = new JobConf(new Configuration(), MyJob.class);
+ *     
+ *     // Specify various job-specific parameters     
+ *     job.setJobName("myjob");
+ *     
+ *     job.setInputPath(new Path("in"));
+ *     job.setOutputPath(new Path("out"));
+ *     
+ *     job.setMapperClass(MyJob.MyMapper.class);
+ *     job.setCombinerClass(MyJob.MyReducer.class);
+ *     job.setReducerClass(MyJob.MyReducer.class);
+ *     
+ *     job.setInputFormat(SequenceFileInputFormat.class);
+ *     job.setOutputFormat(SequenceFileOutputFormat.class);
+ * </pre></blockquote></p>
+ * 
+ * @see JobClient
+ * @see ClusterStatus
+ * @see Tool
+ * @see DistributedCache
+ */
 public class JobConf extends Configuration {
   
   private static final Log LOG = LogFactory.getLog(JobConf.class);
@@ -61,6 +115,7 @@
 
   /** 
    * Construct a map/reduce job configuration.
+   * 
    * @param exampleClass a class whose containing jar is used as the job's jar.
    */
   public JobConf(Class exampleClass) {
@@ -92,7 +147,7 @@
 
   /** Construct a map/reduce configuration.
    *
-   * @param config a Configuration-format XML job description file
+   * @param config a Configuration-format XML job description file.
    */
   public JobConf(String config) {
     this(new Path(config));
@@ -100,7 +155,7 @@
 
   /** Construct a map/reduce configuration.
    *
-   * @param config a Configuration-format XML job description file
+   * @param config a Configuration-format XML job description file.
    */
   public JobConf(Path config) {
     super();
@@ -111,6 +166,7 @@
   /**
    * Checks if <b>mapred-default.xml</b> is on the CLASSPATH, if so
    * it warns the user and loads it as a {@link Configuration} resource.
+   * 
    * @deprecated Remove in hadoop-0.16.0 via HADOOP-1843
    */
   private void checkWarnAndLoadMapredDefault() {
@@ -122,12 +178,24 @@
     }
   }
   
+  /**
+   * Get the user jar for the map-reduce job.
+   * 
+   * @return the user jar for the map-reduce job.
+   */
   public String getJar() { return get("mapred.jar"); }
+  
+  /**
+   * Set the user jar for the map-reduce job.
+   * 
+   * @param jar the user jar for the map-reduce job.
+   */
   public void setJar(String jar) { set("mapred.jar", jar); }
   
   /**
    * Set the job's jar file by finding an example class location.
-   * @param cls the example class
+   * 
+   * @param cls the example class.
    */
   public void setJarByClass(Class cls) {
     String jar = findContainingJar(cls);
@@ -136,6 +204,11 @@
     }   
   }
 
+  /**
+   * Get the system directory where job-specific files are to be placed.
+   * 
+   * @return the system directory where job-specific files are to be placed.
+   */
   public Path getSystemDir() {
     return new Path(get("mapred.system.dir", "/tmp/hadoop/mapred/system"));
   }
@@ -158,23 +231,41 @@
     }
   }
 
-  /** Constructs a local file name.  Files are distributed among configured
-   * local directories.*/
+  /** 
+   * Constructs a local file name. Files are distributed among configured
+   * local directories.
+   */
   public Path getLocalPath(String pathString) throws IOException {
     return getLocalPath("mapred.local.dir", pathString);
   }
 
+  /**
+   * Set the {@link Path} of the input directory for the map-reduce job.
+   * 
+   * @param dir the {@link Path} of the input directory for the map-reduce job.
+   */
   public void setInputPath(Path dir) {
     dir = new Path(getWorkingDirectory(), dir);
     set("mapred.input.dir", dir.toString());
   }
 
+  /**
+   * Add a {@link Path} to the list of inputs for the map-reduce job.
+   * 
+   * @param dir {@link Path} to be added to the list of inputs for 
+   *            the map-reduce job.
+   */
   public void addInputPath(Path dir) {
     dir = new Path(getWorkingDirectory(), dir);
     String dirs = get("mapred.input.dir");
     set("mapred.input.dir", dirs == null ? dir.toString() : dirs + "," + dir);
   }
 
+  /**
+   * Get the list of input {@link Path}s for the map-reduce job.
+   * 
+   * @return the list of input {@link Path}s for the map-reduce job.
+   */
   public Path[] getInputPaths() {
     String dirs = get("mapred.input.dir", "");
     ArrayList list = Collections.list(new StringTokenizer(dirs, ","));
@@ -187,6 +278,7 @@
 
   /**
    * Get the reported username for this job.
+   * 
    * @return the username
    */
   public String getUser() {
@@ -195,7 +287,8 @@
   
   /**
    * Set the reported username for this job.
-   * @param user the username
+   * 
+   * @param user the username for this job.
    */
   public void setUser(String user) {
     set("user.name", user);
@@ -206,6 +299,10 @@
   /**
    * Set whether the framework should keep the intermediate files for 
    * failed tasks.
+   * 
+   * @param keep <code>true</code> if framework should keep the intermediate files 
+   *             for failed tasks, <code>false</code> otherwise.
+   * 
    */
   public void setKeepFailedTaskFiles(boolean keep) {
     setBoolean("keep.failed.task.files", keep);
@@ -213,6 +310,7 @@
   
   /**
    * Should the temporary files for failed tasks be kept?
+   * 
    * @return should the files be kept?
    */
   public boolean getKeepFailedTaskFiles() {
@@ -223,6 +321,7 @@
    * Set a regular expression for task names that should be kept. 
    * The regular expression ".*_m_000123_0" would keep the files
    * for the first instance of map 123 that ran.
+   * 
    * @param pattern the java.util.regex.Pattern to match against the 
    *        task names.
    */
@@ -233,15 +332,17 @@
   /**
    * Get the regular expression that is matched against the task names
    * to see if we need to keep the files.
-   * @return the pattern as a string, if it was set, othewise null
+   * 
+   * @return the pattern as a string, if it was set, othewise null.
    */
   public String getKeepTaskFilesPattern() {
     return get("keep.task.files.pattern");
   }
   
   /**
-   * Set the current working directory for the default file system
-   * @param dir the new current working directory
+   * Set the current working directory for the default file system.
+   * 
+   * @param dir the new current working directory.
    */
   public void setWorkingDirectory(Path dir) {
     dir = new Path(getWorkingDirectory(), dir);
@@ -250,7 +351,8 @@
   
   /**
    * Get the current working directory for the default file system.
-   * @return the directory name
+   * 
+   * @return the directory name.
    */
   public Path getWorkingDirectory() {
     String name = get("mapred.working.dir");
@@ -267,31 +369,107 @@
     }
   }
   
+  /**
+   * Get the {@link Path} to the output directory for the map-reduce job.
+   * 
+   * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4>
+   * 
+   * <p>Some applications need to create/write-to side-files, which differ from
+   * the actual job-outputs.
+   * 
+   * <p>In such cases there could be issues with 2 instances of the same TIP 
+   * (running simultaneously e.g. speculative tasks) trying to open/write-to the
+   * same file (path) on HDFS. Hence the application-writer will have to pick 
+   * unique names per task-attempt (e.g. using the taskid, say 
+   * <tt>task_200709221812_0001_m_000000_0</tt>), not just per TIP.</p> 
+   * 
+   * <p>To get around this the Map-Reduce framework helps the application-writer 
+   * out by maintaining a special <tt>${mapred.output.dir}/_${taskid}</tt> 
+   * sub-directory for each task-attempt on HDFS where the output of the 
+   * task-attempt goes. On successful completion of the task-attempt the files 
+   * in the <tt>${mapred.output.dir}/_${taskid}</tt> (only) 
+   * are <i>promoted</i> to <tt>${mapred.output.dir}</tt>. Of course, the 
+   * framework discards the sub-directory of unsuccessful task-attempts. This 
+   * is completely transparent to the application.</p>
+   * 
+   * <p>The application-writer can take advantage of this by creating any 
+   * side-files required in <tt>${mapred.output.dir}</tt> during execution of his 
+   * reduce-task i.e. via {@link #getOutputPath()}, and the framework will move 
+   * them out similarly - thus she doesn't have to pick unique paths per 
+   * task-attempt.</p>
+   * 
+   * <p><i>Note</i>: the value of <tt>${mapred.output.dir}</tt> during execution 
+   * of a particular task-attempt is actually 
+   * <tt>${mapred.output.dir}/_{$taskid}</tt>, not the value set by 
+   * {@link #setOutputPath(Path)}. So, just create any side-files in the path 
+   * returned by {@link #getOutputPath()} from map/reduce task to take 
+   * advantage of this feature.</p>
+   * 
+   * <p>The entire discussion holds true for maps of jobs with 
+   * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, 
+   * goes directly to HDFS.</p> 
+   * 
+   * @return the {@link Path} to the output directory for the map-reduce job.
+   */
   public Path getOutputPath() { 
     String name = get("mapred.output.dir");
     return name == null ? null: new Path(name);
   }
 
+  /**
+   * Set the {@link Path} of the output directory for the map-reduce job.
+   * 
+   * <p><i>Note</i>:
+   * </p>
+   * @param dir the {@link Path} of the output directory for the map-reduce job.
+   */
   public void setOutputPath(Path dir) {
     dir = new Path(getWorkingDirectory(), dir);
     set("mapred.output.dir", dir.toString());
   }
 
+  /**
+   * Get the {@link InputFormat} implementation for the map-reduce job,
+   * defaults to {@link TextInputFormat} if not specified explicity.
+   * 
+   * @return the {@link InputFormat} implementation for the map-reduce job.
+   */
   public InputFormat getInputFormat() {
     return (InputFormat)ReflectionUtils.newInstance(getClass("mapred.input.format.class",
                                                              TextInputFormat.class,
                                                              InputFormat.class),
                                                     this);
   }
+  
+  /**
+   * Set the {@link InputFormat} implementation for the map-reduce job.
+   * 
+   * @param theClass the {@link InputFormat} implementation for the map-reduce 
+   *                 job.
+   */
   public void setInputFormat(Class<? extends InputFormat> theClass) {
     setClass("mapred.input.format.class", theClass, InputFormat.class);
   }
+  
+  /**
+   * Get the {@link OutputFormat} implementation for the map-reduce job,
+   * defaults to {@link TextOutputFormat} if not specified explicity.
+   * 
+   * @return the {@link OutputFormat} implementation for the map-reduce job.
+   */
   public OutputFormat getOutputFormat() {
     return (OutputFormat)ReflectionUtils.newInstance(getClass("mapred.output.format.class",
                                                               TextOutputFormat.class,
                                                               OutputFormat.class),
                                                      this);
   }
+  
+  /**
+   * Set the {@link OutputFormat} implementation for the map-reduce job.
+   * 
+   * @param theClass the {@link OutputFormat} implementation for the map-reduce 
+   *                 job.
+   */
   public void setOutputFormat(Class<? extends OutputFormat> theClass) {
     setClass("mapred.output.format.class", theClass, OutputFormat.class);
   }
@@ -320,6 +498,7 @@
   /**
    * Should the map outputs be compressed before transfer?
    * Uses the SequenceFile compression.
+   * 
    * @param compress should the map outputs be compressed?
    */
   public void setCompressMapOutput(boolean compress) {
@@ -328,8 +507,9 @@
   
   /**
    * Are the outputs of the maps be compressed?
+   * 
    * @return <code>true</code> if the outputs of the maps are to be compressed,
-   *         <code>false</code> otherwise
+   *         <code>false</code> otherwise.
    */
   public boolean getCompressMapOutput() {
     return getBoolean("mapred.compress.map.output", false);
@@ -337,8 +517,9 @@
 
   /**
    * Set the {@link CompressionType} for the map outputs.
+   * 
    * @param style the {@link CompressionType} to control how the map outputs  
-   *              are compressed
+   *              are compressed.
    */
   public void setMapOutputCompressionType(CompressionType style) {
     set("mapred.map.output.compression.type", style.toString());
@@ -346,8 +527,9 @@
   
   /**
    * Get the {@link CompressionType} for the map outputs.
+   * 
    * @return the {@link CompressionType} for map outputs, defaulting to 
-   *         {@link CompressionType#RECORD} 
+   *         {@link CompressionType#RECORD}. 
    */
   public CompressionType getMapOutputCompressionType() {
     String val = get("mapred.map.output.compression.type", 
@@ -357,8 +539,9 @@
 
   /**
    * Set the given class as the  {@link CompressionCodec} for the map outputs.
+   * 
    * @param codecClass the {@link CompressionCodec} class that will compress  
-   *                   the map outputs
+   *                   the map outputs.
    */
   public void 
   setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
@@ -368,9 +551,10 @@
   
   /**
    * Get the {@link CompressionCodec} for compressing the map outputs.
+   * 
    * @param defaultValue the {@link CompressionCodec} to return if not set
    * @return the {@link CompressionCodec} class that should be used to compress the 
-   *         map outputs
+   *         map outputs.
    * @throws IllegalArgumentException if the class was specified, but not found
    */
   public Class<? extends CompressionCodec> 
@@ -390,10 +574,10 @@
   
   /**
    * Get the key class for the map output data. If it is not set, use the
-   * (final) output ket class This allows the map output key class to be
-   * different than the final output key class
-   * 
-   * @return map output key class
+   * (final) output key class. This allows the map output key class to be
+   * different than the final output key class.
+   *  
+   * @return the map output key class.
    */
   public Class<? extends WritableComparable> getMapOutputKeyClass() {
     Class<? extends WritableComparable> retv = getClass("mapred.mapoutput.key.class", null,
@@ -407,7 +591,9 @@
   /**
    * Set the key class for the map output data. This allows the user to
    * specify the map output key class to be different than the final output
-   * value class
+   * value class.
+   * 
+   * @param theClass the map output key class.
    */
   public void setMapOutputKeyClass(Class<? extends WritableComparable> theClass) {
     setClass("mapred.mapoutput.key.class", theClass,
@@ -417,9 +603,9 @@
   /**
    * Get the value class for the map output data. If it is not set, use the
    * (final) output value class This allows the map output value class to be
-   * different than the final output value class
-   * 
-   * @return map output value class
+   * different than the final output value class.
+   *  
+   * @return the map output value class.
    */
   public Class<? extends Writable> getMapOutputValueClass() {
     Class<? extends Writable> retv = getClass("mapred.mapoutput.value.class", null,
@@ -433,21 +619,38 @@
   /**
    * Set the value class for the map output data. This allows the user to
    * specify the map output value class to be different than the final output
-   * value class
+   * value class.
+   * 
+   * @param theClass the map output value class.
    */
   public void setMapOutputValueClass(Class<? extends Writable> theClass) {
     setClass("mapred.mapoutput.value.class", theClass, Writable.class);
   }
   
+  /**
+   * Get the key class for the job output data.
+   * 
+   * @return the key class for the job output data.
+   */
   public Class<? extends WritableComparable> getOutputKeyClass() {
     return getClass("mapred.output.key.class",
                     LongWritable.class, WritableComparable.class);
   }
   
+  /**
+   * Set the key class for the job output data.
+   * 
+   * @param theClass the key class for the job output data.
+   */
   public void setOutputKeyClass(Class<? extends WritableComparable> theClass) {
     setClass("mapred.output.key.class", theClass, WritableComparable.class);
   }
 
+  /**
+   * Get the {@link WritableComparable} comparator used to compare keys.
+   * 
+   * @return the {@link WritableComparable} comparator used to compare keys.
+   */
   public WritableComparator getOutputKeyComparator() {
     Class theClass = getClass("mapred.output.key.comparator.class", null,
                               WritableComparator.class);
@@ -456,17 +659,24 @@
     return WritableComparator.get(getMapOutputKeyClass());
   }
 
+  /**
+   * Set the {@link WritableComparable} comparator used to compare keys.
+   * 
+   * @param theClass the {@link WritableComparable} comparator used to 
+   *                 compare keys.
+   * @see #setOutputValueGroupingComparator(Class)                 
+   */
   public void setOutputKeyComparatorClass(Class<? extends WritableComparator> theClass) {
     setClass("mapred.output.key.comparator.class",
              theClass, WritableComparator.class);
   }
 
-  /** Get the user defined comparator for grouping values.
+  /** 
+   * Get the user defined {@link WritableComparable} comparator for 
+   * grouping keys of inputs to the reduce.
    * 
-   * This call is used to get the comparator for grouping values by key.
-   * @see #setOutputValueGroupingComparator(Class) for details.
-   *  
-   * @return Comparator set by the user for grouping values.
+   * @return comparator set by the user for grouping values.
+   * @see #setOutputValueGroupingComparator(Class) for details.  
    */
   public WritableComparator getOutputValueGroupingComparator() {
     Class theClass = getClass("mapred.output.value.groupfn.class", null,
@@ -478,120 +688,310 @@
     return (WritableComparator)ReflectionUtils.newInstance(theClass, this);
   }
 
-  /** Set the user defined comparator for grouping values.
+  /** 
+   * Set the user defined {@link WritableComparable} comparator for 
+   * grouping keys in the input to the reduce.
    * 
-   * For key-value pairs (K1,V1) and (K2,V2), the values are passed
-   * in a single call to the map function if K1 and K2 compare as equal.
+   * <p>This comparator should be provided if the equivalence rules for keys
+   * for sorting the intermediates are different from those for grouping keys
+   * before each call to 
+   * {@link Reducer#reduce(WritableComparable, java.util.Iterator, OutputCollector, Reporter)}.</p>
+   *  
+   * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
+   * in a single call to the reduce function if K1 and K2 compare as equal.</p>
    * 
-   * This comparator should be provided if the equivalence rules for keys
-   * for sorting the intermediates are different from those for grouping 
-   * values.
+   * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 
+   * how keys are sorted, this can be used in conjunction to simulate 
+   * <i>secondary sort on values</i>.</p>
+   *  
+   * <p><i>Note</i>: This is not a guarantee of the reduce sort being 
+   * <i>stable</i> in any sense. (In any case, with the order of available 
+   * map-outputs to the reduce being non-deterministic, it wouldn't make 
+   * that much sense.)</p>
    * 
-   * @param theClass The Comparator class to be used for grouping. It should
-   * extend WritableComparator.
+   * @param theClass the comparator class to be used for grouping keys. 
+   *                 It should extend <code>WritableComparator</code>.
+   * @see #setOutputKeyComparatorClass(Class)                 
    */
   public void setOutputValueGroupingComparator(Class theClass) {
     setClass("mapred.output.value.groupfn.class",
              theClass, WritableComparator.class);
   }
 
+  /**
+   * Get the value class for job outputs.
+   * 
+   * @return the value class for job outputs.
+   */
   public Class<? extends Writable> getOutputValueClass() {
     return getClass("mapred.output.value.class", Text.class, Writable.class);
   }
+  
+  /**
+   * Set the value class for job outputs.
+   * 
+   * @param theClass the value class for job outputs.
+   */
   public void setOutputValueClass(Class<? extends Writable> theClass) {
     setClass("mapred.output.value.class", theClass, Writable.class);
   }
 
-
+  /**
+   * Get the {@link Mapper} class for the job.
+   * 
+   * @return the {@link Mapper} class for the job.
+   */
   public Class<? extends Mapper> getMapperClass() {
     return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
   }
+  
+  /**
+   * Set the {@link Mapper} class for the job.
+   * 
+   * @param theClass the {@link Mapper} class for the job.
+   */
   public void setMapperClass(Class<? extends Mapper> theClass) {
     setClass("mapred.mapper.class", theClass, Mapper.class);
   }
 
+  /**
+   * Get the {@link MapRunnable} class for the job.
+   * 
+   * @return the {@link MapRunnable} class for the job.
+   */
   public Class<? extends MapRunnable> getMapRunnerClass() {
     return getClass("mapred.map.runner.class",
                     MapRunner.class, MapRunnable.class);
   }
+  
+  /**
+   * Expert: Set the {@link MapRunnable} class for the job.
+   * 
+   * Typically used to exert greater control on {@link Mapper}s.
+   * 
+   * @param theClass the {@link MapRunnable} class for the job.
+   */
   public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
     setClass("mapred.map.runner.class", theClass, MapRunnable.class);
   }
 
+  /**
+   * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 
+   * to be sent to the {@link Reducer}s.
+   * 
+   * @return the {@link Partitioner} used to partition map-outputs.
+   */
   public Class<? extends Partitioner> getPartitionerClass() {
     return getClass("mapred.partitioner.class",
                     HashPartitioner.class, Partitioner.class);
   }
+  
+  /**
+   * Set the {@link Partitioner} class used to partition 
+   * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
+   * 
+   * @param theClass the {@link Partitioner} used to partition map-outputs.
+   */
   public void setPartitionerClass(Class<? extends Partitioner> theClass) {
     setClass("mapred.partitioner.class", theClass, Partitioner.class);
   }
 
+  /**
+   * Get the {@link Reducer} class for the job.
+   * 
+   * @return the {@link Reducer} class for the job.
+   */
   public Class<? extends Reducer> getReducerClass() {
     return getClass("mapred.reducer.class",
                     IdentityReducer.class, Reducer.class);
   }
+  
+  /**
+   * Set the {@link Reducer} class for the job.
+   * 
+   * @param theClass the {@link Reducer} class for the job.
+   */
   public void setReducerClass(Class<? extends Reducer> theClass) {
     setClass("mapred.reducer.class", theClass, Reducer.class);
   }
 
+  /**
+   * Get the user-defined <i>combiner</i> class used to combine map-outputs 
+   * before being sent to the reducers. Typically the combiner is same as the
+   * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
+   * 
+   * @return the user-defined combiner class used to combine map-outputs.
+   */
   public Class<? extends Reducer> getCombinerClass() {
     return getClass("mapred.combiner.class", null, Reducer.class);
   }
+
+  /**
+   * Set the user-defined <i>combiner</i> class used to combine map-outputs 
+   * before being sent to the reducers. 
+   * 
+   * <p>The combiner is a task-level aggregation operation which, in some cases,
+   * helps to cut down the amount of data transferred from the {@link Mapper} to
+   * the {@link Reducer}, leading to better performance.</p>
+   *  
+   * <p>Typically the combiner is same as the the <code>Reducer</code> for the  
+   * job i.e. {@link #setReducerClass(Class)}.</p>
+   * 
+   * @param theClass the user-defined combiner class used to combine 
+   *                 map-outputs.
+   */
   public void setCombinerClass(Class<? extends Reducer> theClass) {
     setClass("mapred.combiner.class", theClass, Reducer.class);
   }
   
   /**
-   * Should speculative execution be used for this job?
-   * @return Defaults to true
+   * Should speculative execution be used for this job? 
+   * Defaults to <code>true</code>.
+   * 
+   * @return <code>true</code> if speculative execution be used for this job,
+   *         <code>false</code> otherwise.
    */
   public boolean getSpeculativeExecution() { 
     return getBoolean("mapred.speculative.execution", true);
   }
   
   /**
-   * Turn on or off speculative execution for this job.
-   * In general, it should be turned off for map jobs that have side effects.
+   * Turn speculative execution on or off for this job. 
+   * 
+   * @param speculativeExecution <code>true</code> if speculative execution 
+   *                             should be turned on, else <code>false</code>.
    */
-  public void setSpeculativeExecution(boolean new_val) {
-    setBoolean("mapred.speculative.execution", new_val);
+  public void setSpeculativeExecution(boolean speculativeExecution) {
+    setBoolean("mapred.speculative.execution", speculativeExecution);
   }
   
+  /**
+   * Get configured the number of reduce tasks for this job.
+   * Defaults to <code>1</code>.
+   * 
+   * @return the number of reduce tasks for this job.
+   */
   public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); }
+  
+  /**
+   * Set the number of map tasks for this job.
+   * 
+   * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 
+   * number of spawned map tasks depends on the number of {@link InputSplit}s 
+   * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
+   *  
+   * A custom {@link InputFormat} is typically used to accurately control 
+   * the number of map tasks for the job.</p>
+   * 
+   * <h4 id="NoOfMaps">How many maps?</h4>
+   * 
+   * <p>The number of maps is usually driven by the total size of the inputs 
+   * i.e. total number of blocks of the input files.</p>
+   *  
+   * <p>The right level of parallelism for maps seems to be around 10-100 maps 
+   * per-node, although it has been set up to 300 or so for very cpu-light map 
+   * tasks. Task setup takes awhile, so it is best if the maps take at least a 
+   * minute to execute.</p>
+   * 
+   * <p>The default behavior of file-based {@link InputFormat}s is to split the 
+   * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
+   * bytes, of input files. However, the {@link FileSystem} blocksize of the 
+   * input files is treated as an upper bound for input splits. A lower bound 
+   * on the split size can be set via 
+   * <a href="{@docRoot}/../hadoop-default.html#mapred.min.split.size">
+   * mapred.min.split.size</a>.</p>
+   *  
+   * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 
+   * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 
+   * used to set it even higher.</p>
+   * 
+   * @param n the number of map tasks for this job.
+   * @see InputFormat#getSplits(JobConf, int)
+   * @see FileInputFormat
+   * @see FileSystem#getDefaultBlockSize()
+   * @see FileStatus#getBlockSize()
+   */
   public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); }
 
+  /**
+   * Get configured the number of reduce tasks for this job. Defaults to 
+   * <code>1</code>.
+   * 
+   * @return the number of reduce tasks for this job.
+   */
   public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 1); }
+  
+  /**
+   * Set the requisite number of reduce tasks for this job.
+   * 
+   * <h4 id="NoOfReduces">How many reduces?</h4>
+   * 
+   * <p>The right number of reduces seems to be <code>0.95</code> or 
+   * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; * 
+   * <a href="{@docRoot}/../hadoop-default.html#mapred.tasktracker.tasks.maximum">
+   * mapred.tasktracker.tasks.maximum</a>).
+   * </p>
+   * 
+   * <p>With <code>0.95</code> all of the reduces can launch immediately and 
+   * start transfering map outputs as the maps finish. With <code>1.75</code> 
+   * the faster nodes will finish their first round of reduces and launch a 
+   * second wave of reduces doing a much better job of load balancing.</p>
+   * 
+   * <p>Increasing the number of reduces increases the framework overhead, but 
+   * increases load balancing and lowers the cost of failures.</p>
+   * 
+   * <p>The scaling factors above are slightly less than whole numbers to 
+   * reserve a few reduce slots in the framework for speculative-tasks, failures
+   * etc.</p> 
+   *
+   * <h4 id="ReducerNone">Reducer NONE</h4>
+   * 
+   * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
+   * 
+   * <p>In this case the output of the map-tasks directly go to distributed 
+   * file-system, to the path set by {@link #setOutputPath(Path)}. Also, the 
+   * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
+   * 
+   * @param n the number of reduce tasks for this job.
+   */
   public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); }
   
-  /** Get the configured number of maximum attempts that will be made to run a
-   *  map task, as specified by the <code>mapred.map.max.attempts</code>
-   *  property. If this property is not already set, the default is 4 attempts
-   * @return the max number of attempts
+  /** 
+   * Get the configured number of maximum attempts that will be made to run a
+   * map task, as specified by the <code>mapred.map.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   *  
+   * @return the max number of attempts per map task.
    */
   public int getMaxMapAttempts() {
     return getInt("mapred.map.max.attempts", 4);
   }
-  /** Expert: Set the number of maximum attempts that will be made to run a
-   *  map task
-   * @param n the number of attempts
-   *
+  
+  /** 
+   * Expert: Set the number of maximum attempts that will be made to run a
+   * map task.
+   * 
+   * @param n the number of attempts per map task.
    */
   public void setMaxMapAttempts(int n) {
     setInt("mapred.map.max.attempts", n);
   }
 
-  /** Get the configured number of maximum attempts  that will be made to run a
-   *  reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
-   *  property. If this property is not already set, the default is 4 attempts
-   * @return the max number of attempts
+  /** 
+   * Get the configured number of maximum attempts  that will be made to run a
+   * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   * 
+   * @return the max number of attempts per reduce task.
    */
   public int getMaxReduceAttempts() {
     return getInt("mapred.reduce.max.attempts", 4);
   }
-  /** Expert: Set the number of maximum attempts that will be made to run a
-   *  reduce task
-   * @param n the number of attempts
-   *
+  /** 
+   * Expert: Set the number of maximum attempts that will be made to run a
+   * reduce task.
+   * 
+   * @param n the number of attempts per reduce task.
    */
   public void setMaxReduceAttempts(int n) {
     setInt("mapred.reduce.max.attempts", n);
@@ -600,7 +1000,8 @@
   /**
    * Get the user-specified job name. This is only used to identify the 
    * job to the user.
-   * @return the job's name, defaulting to ""
+   * 
+   * @return the job's name, defaulting to "".
    */
   public String getJobName() {
     return get("mapred.job.name", "");
@@ -608,7 +1009,8 @@
   
   /**
    * Set the user-specified job name.
-   * @param name the job's new name
+   * 
+   * @param name the job's new name.
    */
   public void setJobName(String name) {
     set("mapred.job.name", name);
@@ -627,16 +1029,16 @@
    * When not running under HOD, this identifer is expected to remain set to 
    * the empty string.
    *
-   * @return the session identifier, defaulting to ""
+   * @return the session identifier, defaulting to "".
    */
   public String getSessionId() {
       return get("session.id", "");
   }
   
   /**
-   * Set the user-specified session idengifier.  
+   * Set the user-specified session identifier.  
    *
-   * @param sessionId the new session id
+   * @param sessionId the new session id.
    */
   public void setSessionId(String sessionId) {
       set("session.id", sessionId);
@@ -644,6 +1046,8 @@
     
   /**
    * Set the maximum no. of failures of a given job per tasktracker.
+   * If the no. of task failures exceeds <code>noFailures</code>, the 
+   * tasktracker is <i>blacklisted</i> for this job. 
    * 
    * @param noFailures maximum no. of failures of a given job per tasktracker.
    */
@@ -652,7 +1056,9 @@
   }
   
   /**
-   * Get the maximum no. of failures of a given job per tasktracker.
+   * Expert: Get the maximum no. of failures of a given job per tasktracker.
+   * If the no. of task failures exceeds this, the tasktracker is
+   * <i>blacklisted</i> for this job. 
    * 
    * @return the maximum no. of failures of a given job per tasktracker.
    */
@@ -662,21 +1068,30 @@
 
   /**
    * Get the maximum percentage of map tasks that can fail without 
-   * the job being aborted.
+   * the job being aborted. 
+   * 
+   * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 
+   * attempts before being declared as <i>failed</i>.
+   *  
+   * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
+   * the job being declared as {@link JobStatus#FAILED}.
    * 
    * @return the maximum percentage of map tasks that can fail without
-   *         the job being aborted
+   *         the job being aborted.
    */
   public int getMaxMapTaskFailuresPercent() {
     return getInt("mapred.max.map.failures.percent", 0);
   }
 
   /**
-   * Set the maximum percentage of map tasks that can fail without the job
-   * being aborted.
+   * Expert: Set the maximum percentage of map tasks that can fail without the
+   * job being aborted. 
+   * 
+   * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 
+   * before being declared as <i>failed</i>.
    * 
    * @param percent the maximum percentage of map tasks that can fail without 
-   *                the job being aborted
+   *                the job being aborted.
    */
   public void setMaxMapTaskFailuresPercent(int percent) {
     setInt("mapred.max.map.failures.percent", percent);
@@ -684,10 +1099,16 @@
   
   /**
    * Get the maximum percentage of reduce tasks that can fail without 
-   * the job being aborted.
+   * the job being aborted. 
+   * 
+   * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
+   * attempts before being declared as <i>failed</i>.
+   * 
+   * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 
+   * in the job being declared as {@link JobStatus#FAILED}.
    * 
    * @return the maximum percentage of reduce tasks that can fail without
-   *         the job being aborted
+   *         the job being aborted.
    */
   public int getMaxReduceTaskFailuresPercent() {
     return getInt("mapred.max.reduce.failures.percent", 0);
@@ -697,24 +1118,29 @@
    * Set the maximum percentage of reduce tasks that can fail without the job
    * being aborted.
    * 
+   * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
+   * attempts before being declared as <i>failed</i>.
+   * 
    * @param percent the maximum percentage of reduce tasks that can fail without 
-   *                the job being aborted
+   *                the job being aborted.
    */
   public void setMaxReduceTaskFailuresPercent(int percent) {
     setInt("mapred.max.reduce.failures.percent", percent);
   }
   
   /**
-   * Set job priority for this job.
+   * Set {@link JobPriority} for this job.
    * 
-   * @param prio
+   * @param prio the {@link JobPriority} for this job.
    */
   public void setJobPriority(JobPriority prio) {
     set("mapred.job.priority", prio.toString());
   }
   
   /**
-   * Get the job priority for this job.
+   * Get the {@link JobPriority} for this job.
+   * 
+   * @return the {@link JobPriority} for this job.
    */
   public JobPriority getJobPriority() {
     String prio = get("mapred.job.priority");
@@ -725,12 +1151,44 @@
     return JobPriority.valueOf(prio);
   }
   
+  /**
+   * Get the uri to be invoked in-order to send a notification after the job 
+   * has completed (success/failure). 
+   * 
+   * @return the job end notification uri, <code>null</code> if it hasn't
+   *         been set.
+   * @see #setJobEndNotificationURI(String)
+   */
+  public String getJobEndNotificationURI() {
+    return get("job.end.notification.url");
+  }
+
+  /**
+   * Set the uri to be invoked in-order to send a notification after the job
+   * has completed (success/failure).
+   * 
+   * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 
+   * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 
+   * identifier and completion-status respectively.</p>
+   * 
+   * <p>This is typically used by application-writers to implement chaining of 
+   * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
+   * 
+   * @param uri the job end notification uri
+   * @see JobStatus
+   * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#JobCompletionAndChaining">Job Completion and Chaining</a>
+   */
+  public void setJobEndNotificationURI(String uri) {
+    set("job.end.notification.url", uri);
+  }
   
-  /** Find a jar that contains a class of the same name, if any.
+  /** 
+   * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing
    * on the class path that has a class with the same name.
-   * @param my_class the class to find
-   * @return a jar file that contains the class, or null
+   * 
+   * @param my_class the class to find.
+   * @return a jar file that contains the class, or null.
    * @throws IOException
    */
   private static String findContainingJar(Class my_class) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java Wed Oct 24 14:18:51 2007
@@ -95,7 +95,7 @@
   private static JobEndStatusInfo createNotification(JobConf conf,
                                                      JobStatus status) {
     JobEndStatusInfo notification = null;
-    String uri = conf.get("job.end.notification.url");
+    String uri = conf.getJobEndNotificationURI();
     if (uri != null) {
       // +1 to make logic for first notification identical to a retry
       int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java Wed Oct 24 14:18:51 2007
@@ -23,8 +23,11 @@
 import org.apache.hadoop.io.Closeable;
 import org.apache.hadoop.mapred.JobConfigurable;
 
-/** Base class for {@link Mapper} and {@link Reducer} implementations.
- * Provides default implementations for a few methods.
+/** 
+ * Base class for {@link Mapper} and {@link Reducer} implementations.
+ * 
+ * <p>Provides default no-op implementations for a few methods, most non-trivial
+ * applications need to override some of them.</p>
  */
 public class MapReduceBase implements Closeable, JobConfigurable {
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java Wed Oct 24 14:18:51 2007
@@ -23,15 +23,28 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-/** Expert: Permits greater control of map processing. For example,
- * implementations might perform multi-threaded, asynchronous mappings. */
+/**
+ * Expert: Generic interface for {@link Mapper}s.
+ * 
+ * <p>Custom implementations of <code>MapRunnable</code> can exert greater 
+ * control on map processing e.g. multi-threaded, asynchronous mappers etc.</p>
+ * 
+ * @see Mapper
+ */
 public interface MapRunnable<K1 extends WritableComparable, V1 extends Writable,
                              K2 extends WritableComparable, V2 extends Writable>
     extends JobConfigurable {
   
-  /** Called to execute mapping.  Mapping is complete when this returns.
-   * @param input the {@link RecordReader} with input key/value pairs.
-   * @param output the {@link OutputCollector} for mapped key/value pairs.
+  /** 
+   * Start mapping input <tt>&lt;key, value&gt;</tt> pairs.
+   *  
+   * <p>Mapping of input records to output records is complete when this method 
+   * returns.</p>
+   * 
+   * @param input the {@link RecordReader} to read the input records.
+   * @param output the {@link OutputCollector} to collect the outputrecords.
+   * @param reporter {@link Reporter} to report progress, status-updates etc.
+   * @throws IOException
    */
   void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
            Reporter reporter)

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java Wed Oct 24 14:18:51 2007
@@ -20,29 +20,141 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Closeable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.CompressionCodec;
 
-/** Maps input key/value pairs to a set of intermediate key/value pairs.  All
- * intermediate values associated with a given output key are subsequently
- * grouped by the map/reduce system, and passed to a {@link Reducer} to
- * determine the final output.. */
+/** 
+ * Maps input key/value pairs to a set of intermediate key/value pairs.  
+ * 
+ * <p>Maps are the individual tasks which transform input records into a 
+ * intermediate records. The transformed intermediate records need not be of 
+ * the same type as the input records. A given input pair may map to zero or 
+ * many output pairs.</p> 
+ * 
+ * <p>The Hadoop Map-Reduce framework spawns one map task for each 
+ * {@link InputSplit} generated by the {@link InputFormat} for the job.
+ * <code>Mapper</code> implementations can access the {@link JobConf} for the 
+ * job via the {@link JobConfigurable#configure(JobConf)} and initialize
+ * themselves. Similarly they can use the {@link Closeable#close()} method for
+ * de-initialization.</p>
+ * 
+ * <p>The framework then calls 
+ * {@link #map(WritableComparable, Writable, OutputCollector, Reporter)} 
+ * for each key/value pair in the <code>InputSplit</code> for that task.</p>
+ * 
+ * <p>All intermediate values associated with a given output key are 
+ * subsequently grouped by the framework, and passed to a {@link Reducer} to  
+ * determine the final output. Users can control the grouping by specifying
+ * a <code>Comparator</code> via 
+ * {@link JobConf#setOutputKeyComparatorClass(Class)}.</p>
+ *
+ * <p>The grouped <code>Mapper</code> outputs are partitioned per 
+ * <code>Reducer</code>. Users can control which keys (and hence records) go to 
+ * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
+ * 
+ * <p>Users can optionally specify a <code>combiner</code>, via 
+ * {@link JobConf#setCombinerClass(Class)}, to perform local aggregation of the 
+ * intermediate outputs, which helps to cut down the amount of data transferred 
+ * from the <code>Mapper</code> to the <code>Reducer</code>.
+ * 
+ * <p>The intermediate, grouped outputs are always stored in 
+ * {@link SequenceFile}s. Applications can specify if and how the intermediate
+ * outputs are to be compressed and which {@link CompressionCodec}s are to be
+ * used via the <code>JobConf</code>.</p>
+ *  
+ * <p>If the job has 
+ * <a href="{@docRoot}/org/apache/hadoop/mapred/JobConf.html#ReducerNone">zero
+ * reduces</a> then the output of the <code>Mapper</code> is directly written
+ * to the {@link FileSystem} without grouping by keys.</p>
+ * 
+ * <p>Example:</p>
+ * <p><blockquote><pre>
+ *     public class MyMapper&lt;K extends WritableComparable, V extends Writable&gt; 
+ *     extends MapReduceBase implements Mapper&lt;K, V, K, V&gt; {
+ *     
+ *       static enum MyCounters { NUM_RECORDS }
+ *       
+ *       private String mapTaskId;
+ *       private String inputFile;
+ *       private int noRecords = 0;
+ *       
+ *       public void configure(JobConf job) {
+ *         mapTaskId = job.get("mapred.task.id");
+ *         inputFile = job.get("mapred.input.file");
+ *       }
+ *       
+ *       public void map(K key, V val,
+ *                       OutputCollector&lt;K, V&gt; output, Reporter reporter)
+ *       throws IOException {
+ *         // Process the &lt;key, value&gt; pair (assume this takes a while)
+ *         // ...
+ *         // ...
+ *         
+ *         // Let the framework know that we are alive, and kicking!
+ *         // reporter.progress();
+ *         
+ *         // Process some more
+ *         // ...
+ *         // ...
+ *         
+ *         // Increment the no. of &lt;key, value&gt; pairs processed
+ *         ++noRecords;
+ *
+ *         // Increment counters
+ *         reporter.incrCounter(NUM_RECORDS, 1);
+ *        
+ *         // Every 100 records update application-level status
+ *         if ((noRecords%100) == 0) {
+ *           reporter.setStatus(mapTaskId + " processed " + noRecords + 
+ *                              " from input-file: " + inputFile); 
+ *         }
+ *         
+ *         // Output the result
+ *         output.collect(key, val);
+ *       }
+ *     }
+ *
+ * <p>Applications may write a custom {@link MapRunnable} to exert greater
+ * control on map processing e.g. multi-threaded <code>Mapper</code>s etc.</p>
+ * 
+ * @see JobConf
+ * @see InputFormat
+ * @see Partitioner  
+ * @see Reducer
+ * @see MapReduceBase
+ * @see MapRunnable
+ * @see SequenceFile
+ */
 public interface Mapper<K1 extends WritableComparable, V1 extends Writable,
                         K2 extends WritableComparable, V2 extends Writable>
   extends JobConfigurable, Closeable {
   
-  /** Maps a single input key/value pair into intermediate key/value pairs.
-   * Output pairs need not be of the same types as input pairs.  A given input
-   * pair may map to zero or many output pairs.  Output pairs are collected
-   * with calls to {@link
-   * OutputCollector#collect(WritableComparable,Writable)}.
+  /** 
+   * Maps a single input key/value pair into an intermediate key/value pair.
+   * 
+   * <p>Output pairs need not be of the same types as input pairs.  A given 
+   * input pair may map to zero or many output pairs.  Output pairs are 
+   * collected with calls to 
+   * {@link OutputCollector#collect(WritableComparable,Writable)}.</p>
    *
-   * @param key the key
-   * @param value the values
-   * @param output collects mapped keys and values
+   * <p>Applications can use the {@link Reporter} provided to report progress 
+   * or just indicate that they are alive. In scenarios where the application 
+   * takes an insignificant amount of time to process individual key/value 
+   * pairs, this is crucial since the framework might assume that the task has 
+   * timed-out and kill that task. The other way of avoiding this is to set 
+   * <a href="{@docRoot}/../hadoop-default.html#mapred.task.timeout">
+   * mapred.task.timeout</a> to a high-enough value (or even zero for no 
+   * time-outs).</p>
+   * 
+   * @param key the input key.
+   * @param value the input value.
+   * @param output collects mapped keys and values.
+   * @param reporter facility to report progress.
    */
-  void map(K1 key, V1 value,
-           OutputCollector<K2, V2> output, Reporter reporter)
-    throws IOException;
+  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
+  throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java Wed Oct 24 14:18:51 2007
@@ -24,15 +24,23 @@
 import org.apache.hadoop.io.WritableComparable;
 
 
-/** Passed to {@link Mapper} and {@link Reducer} implementations to collect
- * output data. */
+/**
+ * Collects the <code>&lt;key, value&gt;</code> pairs output by {@link Mapper}s
+ * and {@link Reducer}s.
+ *  
+ * <p><code>OutputCollector</code> is the generalization of the facility 
+ * provided by the Map-Reduce framework to collect data output by either the 
+ * <code>Mapper</code> or the <code>Reducer</code> i.e. intermediate outputs 
+ * or the output of the job.</p>  
+ */
 public interface OutputCollector<K extends WritableComparable,
                                  V extends Writable> {
   
   /** Adds a key/value pair to the output.
    *
-   * @param key the key to add
-   * @param value to value to add
+   * @param key the key to collect.
+   * @param value to value to collect.
+   * @throws IOException
    */
   void collect(K key, V value) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java Wed Oct 24 14:18:51 2007
@@ -25,28 +25,53 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 
-/** An output data format.  Output files are stored in a {@link
- * FileSystem}. */
+/** 
+ * <code>OutputFormat</code> describes the output-specification for a 
+ * Map-Reduce job.
+ *
+ * <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
+ * job to:<p>
+ * <ol>
+ *   <li>
+ *   Validate the output-specification of the job. For e.g. check that the 
+ *   output directory doesn't already exist. 
+ *   <li>
+ *   Provide the {@link RecordWriter} implementation to be used to write out
+ *   the output files of the job. Output files are stored in a 
+ *   {@link FileSystem}.
+ *   </li>
+ * </ol>
+ * 
+ * @see RecordWriter
+ * @see JobConf
+ */
 public interface OutputFormat<K extends WritableComparable,
                               V extends Writable> {
 
-  /** Construct a {@link RecordWriter} with Progressable.
+  /** 
+   * Get the {@link RecordWriter} for the given job.
    *
-   * @param job the job whose output is being written
-   * @param name the unique name for this part of the output
-   * @param progress mechanism for reporting progress while writing to file
-   * @return a {@link RecordWriter}
+   * @param ignored
+   * @param job configuration for the job whose output is being written.
+   * @param name the unique name for this part of the output.
+   * @param progress mechanism for reporting progress while writing to file.
+   * @return a {@link RecordWriter} to write the output for the job.
+   * @throws IOException
    */
   RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
                                      String name, Progressable progress)
-    throws IOException;
+  throws IOException;
 
-  /** Check whether the output specification for a job is appropriate.  Called
-   * when a job is submitted.  Typically checks that it does not already exist,
+  /** 
+   * Check for validity of the output-specification for the job.
+   *  
+   * <p>This is to validate the output specification for the job when it is
+   * a job is submitted.  Typically checks that it does not already exist,
    * throwing an exception when it already exists, so that output is not
-   * overwritten.
+   * overwritten.</p>
    *
-   * @param job the job whose output will be written
+   * @param ignored
+   * @param job job configuration.
    * @throws IOException when output should not be attempted
    */
   void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java Wed Oct 24 14:18:51 2007
@@ -21,18 +21,32 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-/** Partitions the key space.  A partition is created for each reduce task. */
+/** 
+ * Partitions the key space.
+ * 
+ * <p><code>Partitioner</code> controls the partitioning of the keys of the 
+ * intermediate map-outputs. The key (or a subset of the key) is used to derive
+ * the partition, typically by a hash function. The total number of partitions
+ * is the same as the number of reduce tasks for the job. Hence this controls
+ * which of the <code>m</code> reduce tasks the intermediate key (and hence the 
+ * record) is sent for reduction.</p>
+ * 
+ * @see Reducer
+ */
 public interface Partitioner<K2 extends WritableComparable,
                              V2 extends Writable>
   extends JobConfigurable {
   
-  /** Returns the paritition number for a given entry given the total number of
-   * partitions.  Typically a hash function on a all or a subset of the key.
+  /** 
+   * Get the paritition number for a given key (hence record) given the total 
+   * number of partitions i.e. number of reduce-tasks for the job.
+   *   
+   * <p>Typically a hash function on a all or a subset of the key.</p>
    *
-   * @param key the entry key
-   * @param value the entry value
-   * @param numPartitions the number of partitions
-   * @return the partition number
+   * @param key the key to be paritioned.
+   * @param value the entry value.
+   * @param numPartitions the total number of partitions.
+   * @return the partition number for the <code>key</code>.
    */
   int getPartition(K2 key, V2 value, int numPartitions);
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java Wed Oct 24 14:18:51 2007
@@ -24,11 +24,23 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-/** Reads key/value pairs from an input file {@link FileSplit}.
- * Implemented by {@link InputFormat} implementations. */
+/**
+ * <code>RecordReader</code> reads &lt;key, value&gt; pairs from an 
+ * {@link InputSplit}.
+ *   
+ * <p><code>RecordReader</code>, typically, converts the byte-oriented view of 
+ * the input, provided by the <code>InputSplit</code>, and presents a 
+ * record-oriented view for the {@link Mapper} & {@link Reducer} tasks for 
+ * processing. It thus assumes the responsibility of processing record 
+ * boundaries and presenting the tasks with keys and values.</p>
+ * 
+ * @see InputSplit
+ * @see InputFormat
+ */
 public interface RecordReader<K extends WritableComparable,
                               V extends Writable> {
-  /** Reads the next key/value pair.
+  /** 
+   * Reads the next key/value pair from the input for processing.
    *
    * @param key the key to read data into
    * @param value the value to read data into
@@ -40,25 +52,38 @@
   
   /**
    * Create an object of the appropriate type to be used as a key.
-   * @return a new key object
+   * 
+   * @return a new key object.
    */
   K createKey();
   
   /**
-   * Create an object of the appropriate type to be used as the value.
-   * @return a new value object
+   * Create an object of the appropriate type to be used as a value.
+   * 
+   * @return a new value object.
    */
   V createValue();
 
-  /** Returns the current position in the input. */
+  /** 
+   * Returns the current position in the input.
+   * 
+   * @return the current position in the input.
+   * @throws IOException
+   */
   long getPos() throws IOException;
 
-  /** Close this to future operations.*/ 
+  /** 
+   * Close this {@link InputSplit} to future operations.
+   * 
+   * @throws IOException
+   */ 
   public void close() throws IOException;
 
   /**
-   * How far has the reader gone through the input.
-   * @return progress from 0.0 to 1.0
+   * How much of the input has the {@link RecordReader} consumed i.e.
+   * has been processed by?
+   * 
+   * @return progress from <code>0.0</code> to <code>1.0</code>.
    * @throws IOException
    */
   float getProgress() throws IOException;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java Wed Oct 24 14:18:51 2007
@@ -21,22 +21,36 @@
 import java.io.IOException;
 import java.io.DataOutput;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 
-/** Writes key/value pairs to an output file.  Implemented by {@link
- * OutputFormat} implementations. */
+/**
+ * <code>RecordWriter</code> writes the output &lt;key, value&gt; pairs 
+ * to an output file.
+ 
+ * <p><code>RecordWriter</code> implementations write the job outputs to the
+ * {@link FileSystem}.
+ * 
+ * @see OutputFormat
+ */
 public interface RecordWriter<K extends WritableComparable,
                               V extends Writable> {
-  /** Writes a key/value pair.
-   *
-   * @param key the key to write
-   * @param value the value to write
+  /** 
+   * Writes a key/value pair.
    *
+   * @param key the key to write.
+   * @param value the value to write.
+   * @throws IOException
    * @see Writable#write(DataOutput)
    */      
   void write(K key, V value) throws IOException;
 
-  /** Close this to future operations.*/ 
+  /** 
+   * Close this <code>RecordWriter</code> to future operations.
+   * 
+   * @param reporter facility to report progress.
+   * @throws IOException
+   */ 
   void close(Reporter reporter) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java Wed Oct 24 14:18:51 2007
@@ -22,24 +22,176 @@
 
 import java.util.Iterator;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Closeable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-/** Reduces a set of intermediate values which share a key to a smaller set of
- * values.  Input values are the grouped output of a {@link Mapper}. */
+/** 
+ * Reduces a set of intermediate values which share a key to a smaller set of
+ * values.  
+ * 
+ * <p>The number of <code>Reducer</code>s for the job is set by the user via 
+ * {@link JobConf#setNumReduceTasks(int)}. <code>Reducer</code> implementations 
+ * can access the {@link JobConf} for the job via the 
+ * {@link JobConfigurable#configure(JobConf)} method and initialize themselves. 
+ * Similarly they can use the {@link Closeable#close()} method for
+ * de-initialization.</p>
+
+ * <p><code>Reducer</code> has 3 primary phases:</p>
+ * <ol>
+ *   <li>
+ *   
+ *   <h4 id="Shuffle">Shuffle</h4>
+ *   
+ *   <p><code>Reducer</code> is input the grouped output of a {@link Mapper}.
+ *   In the phase the framework, for each <code>Reducer</code>, fetches the 
+ *   relevant partition of the output of all the <code>Mapper</code>s, via HTTP. 
+ *   </p>
+ *   </li>
+ *   
+ *   <li>
+ *   <h4 id="Sort">Sort</h4>
+ *   
+ *   <p>The framework groups <code>Reducer</code> inputs by <code>key</code>s 
+ *   (since different <code>Mapper</code>s may have output the same key) in this
+ *   stage.</p>
+ *   
+ *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
+ *   being fetched they are merged.</p>
+ *      
+ *   <h5 id="SecondarySort">SecondarySort</h5>
+ *   
+ *   <p>If equivalence rules for keys while grouping the intermediates are 
+ *   different from those for grouping keys before reduction, then one may 
+ *   specify a <code>Comparator</code> via 
+ *   {@link JobConf#setOutputValueGroupingComparator(Class)}.Since 
+ *   {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to 
+ *   control how intermediate keys are grouped, these can be used in conjunction 
+ *   to simulate <i>secondary sort on values</i>.</p>
+ *   
+ *   
+ *   For example, say that you want to find duplicate web pages and tag them 
+ *   all with the url of the "best" known example. You would set up the job 
+ *   like:
+ *   <ul>
+ *     <li>Map Input Key: url</li>
+ *     <li>Map Input Value: document</li>
+ *     <li>Map Output Key: document checksum, url pagerank</li>
+ *     <li>Map Output Value: url</li>
+ *     <li>Partitioner: by checksum</li>
+ *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
+ *     <li>OutputValueGroupingComparator: by checksum</li>
+ *   </ul>
+ *   </li>
+ *   
+ *   <li>   
+ *   <h4 id="Reduce">Reduce</h4>
+ *   
+ *   <p>In this phase the 
+ *   {@link #reduce(WritableComparable, Iterator, OutputCollector, Reporter)}
+ *   method is called for each <code>&lt;key, (list of values)></code> pair in
+ *   the grouped inputs.</p>
+ *   <p>The output of the reduce task is typically written to the 
+ *   {@link FileSystem} via 
+ *   {@link OutputCollector#collect(WritableComparable, Writable)}.</p>
+ *   </li>
+ * </ol>
+ * 
+ * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
+ * 
+ * <p>Example:</p>
+ * <p><blockquote><pre>
+ *     public class MyReducer&lt;K extends WritableComparable, V extends Writable&gt; 
+ *     extends MapReduceBase implements Reducer&lt;K, V, K, V&gt; {
+ *     
+ *       static enum MyCounters { NUM_RECORDS }
+ *        
+ *       private String reduceTaskId;
+ *       private int noKeys = 0;
+ *       
+ *       public void configure(JobConf job) {
+ *         reduceTaskId = job.get("mapred.task.id");
+ *       }
+ *       
+ *       public void reduce(K key, Iterator&lt;V&gt; values,
+ *                          OutputCollector&lt;K, V&gt; output, 
+ *                          Reporter reporter)
+ *       throws IOException {
+ *       
+ *         // Process
+ *         int noValues = 0;
+ *         while (values.hasNext()) {
+ *           V value = values.next();
+ *           
+ *           // Increment the no. of values for this key
+ *           ++noValues;
+ *           
+ *           // Process the &lt;key, value&gt; pair (assume this takes a while)
+ *           // ...
+ *           // ...
+ *           
+ *           // Let the framework know that we are alive, and kicking!
+ *           if ((noValues%10) == 0) {
+ *             reporter.progress();
+ *           }
+ *         
+ *           // Process some more
+ *           // ...
+ *           // ...
+ *           
+ *           // Output the &lt;key, value&gt; 
+ *           output.collect(key, value);
+ *         }
+ *         
+ *         // Increment the no. of &lt;key, list of values&gt; pairs processed
+ *         ++noKeys;
+ *         
+ *         // Increment counters
+ *         reporter.incrCounter(NUM_RECORDS, 1);
+ *         
+ *         // Every 100 keys update application-level status
+ *         if ((noKeys%100) == 0) {
+ *           reporter.setStatus(reduceTaskId + " processed " + noKeys);
+ *         }
+ *       }
+ *     }
+ * </pre></blockquote></p>
+ * 
+ * @see Mapper
+ * @see Partitioner
+ * @see Reporter
+ * @see MapReduceBase
+ */
 public interface Reducer<K2 extends WritableComparable, V2 extends Writable,
                          K3 extends WritableComparable, V3 extends Writable>
     extends JobConfigurable, Closeable {
   
-  /** Combines values for a given key.  Output values must be of the same type
-   * as input values.  Input keys must not be altered.  Typically all values
-   * are combined into zero or one value.  Output pairs are collected with
-   * calls to {@link OutputCollector#collect(WritableComparable,Writable)}.
+  /** 
+   * <i>Reduces</i> values for a given key.  
+   * 
+   * <p>The framework calls this method for each 
+   * <code>&lt;key, (list of values)></code> pair in the grouped inputs.
+   * Output values must be of the same type as input values.  Input keys must 
+   * not be altered.  Typically all values are combined into zero or one value.
+   * </p>
+   *   
+   * <p>Output pairs are collected with calls to  
+   * {@link OutputCollector#collect(WritableComparable,Writable)}.</p>
    *
-   * @param key the key
-   * @param values the values to combine
-   * @param output to collect combined values
+   * <p>Applications can use the {@link Reporter} provided to report progress 
+   * or just indicate that they are alive. In scenarios where the application 
+   * takes an insignificant amount of time to process individual key/value 
+   * pairs, this is crucial since the framework might assume that the task has 
+   * timed-out and kill that task. The other way of avoiding this is to set 
+   * <a href="{@docRoot}/../hadoop-default.html#mapred.task.timeout">
+   * mapred.task.timeout</a> to a high-enough value (or even zero for no 
+   * time-outs).</p>
+   * 
+   * @param key the key.
+   * @param values the list of values to reduce.
+   * @param output to collect keys and combined values.
+   * @param reporter facility to report progress.
    */
   void reduce(K2 key, Iterator<V2> values,
               OutputCollector<K3, V3> output, Reporter reporter)

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Wed Oct 24 14:18:51 2007
@@ -18,11 +18,24 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
-
 import org.apache.hadoop.util.Progressable;
 
-/** Passed to application code to permit alteration of status. */
+/** 
+ * A facility for Map-Reduce applications to report progress and update 
+ * counters, status information etc.
+ * 
+ * <p>{@link Mapper} and {@link Reducer} can use the <code>Reporter</code>
+ * provided to report progress or just indicate that they are alive. In 
+ * scenarios where the application takes an insignificant amount of time to 
+ * process individual key/value pairs, this is crucial since the framework 
+ * might assume that the task has timed-out and kill that task.
+ *
+ * <p>Applications can also update {@link Counters} via the provided 
+ * <code>Reporter</code> .</p>
+ * 
+ * @see Progressable
+ * @see Counters
+ */
 public interface Reporter extends Progressable {
   
   /**
@@ -41,25 +54,27 @@
     };
 
   /**
-   * Alter the application's status description.
+   * Set the status description for the task.
    * 
-   * @param status
-   *          a brief description of the current status
+   * @param status brief description of the current status.
    */
   public abstract void setStatus(String status);
   
   /**
    * Increments the counter identified by the key, which can be of
-   * any enum type, by the specified amount.
-   * @param key A value of any enum type
+   * any {@link Enum} type, by the specified amount.
+   * 
+   * @param key key to identify the counter to be incremented. The key can be
+   *            be any <code>Enum</code>. 
    * @param amount A non-negative amount by which the counter is to 
-   * be incremented
+   *               be incremented.
    */
   public abstract void incrCounter(Enum key, long amount);
   
   /**
-   * Get the InputSplit object for a map.
-   * @return the input split that the map is reading from
+   * Get the {@link InputSplit} object for a map.
+   * 
+   * @return the <code>InputSplit</code> that the map is reading from.
    * @throws UnsupportedOperationException if called outside a mapper
    */
   public abstract InputSplit getInputSplit() 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Wed Oct 24 14:18:51 2007
@@ -21,78 +21,120 @@
 import java.io.*;
 
 /** 
- * Includes details on a running MapReduce job.  A client can
- * track a living job using this object.
+ * <code>RunningJob</code> is the user-interface to query for details on a 
+ * running Map-Reduce job.
+ * 
+ * <p>Clients can get hold of <code>RunningJob</code> via the {@link JobClient}
+ * and then query the running-job for details such as name, configuration, 
+ * progress etc.</p> 
+ * 
+ * @see JobClient
  */
 public interface RunningJob {
   /**
-   * Returns an identifier for the job
+   * Get the job identifier.
+   * 
+   * @return the job identifier.
    */
   public String getJobID();
   
   /**
-   * Returns the name of the job
+   * Get the name of the job.
+   * 
+   * @return the name of the job.
    */
   public String getJobName();
 
   /**
-   * Returns the path of the submitted job.
+   * Get the path of the submitted job configuration.
+   * 
+   * @return the path of the submitted job configuration.
    */
   public String getJobFile();
 
   /**
-   * Returns a URL where some job progress information will be displayed.
+   * Get the URL where some job progress information will be displayed.
+   * 
+   * @return the URL where some job progress information will be displayed.
    */
   public String getTrackingURL();
 
   /**
-   * Returns a float between 0.0 and 1.0, indicating progress on
-   * the map portion of the job.  When all map tasks have completed,
-   * the function returns 1.0.
+   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
+   * and 1.0.  When all map tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's map-tasks.
+   * @throws IOException
    */
   public float mapProgress() throws IOException;
 
   /**
-   * Returns a float between 0.0 and 1.0, indicating progress on
-   * the reduce portion of the job.  When all reduce tasks have completed,
-   * the function returns 1.0.
+   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
+   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's reduce-tasks.
+   * @throws IOException
    */
   public float reduceProgress() throws IOException;
 
   /**
-   * Non-blocking function to check whether the job is finished or not.
+   * Check if the job is finished or not. 
+   * This is a non-blocking call.
+   * 
+   * @return <code>true</code> if the job is complete, else <code>false</code>.
+   * @throws IOException
    */
   public boolean isComplete() throws IOException;
 
   /**
-   * True iff job completed successfully.
+   * Check if the job completed successfully. 
+   * 
+   * @return <code>true</code> if the job succeeded, else <code>false</code>.
+   * @throws IOException
    */
   public boolean isSuccessful() throws IOException;
 
   /**
    * Blocks until the job is complete.
+   * 
+   * @throws IOException
    */
   public void waitForCompletion() throws IOException;
 
   /**
    * Kill the running job.  Blocks until all job tasks have been
    * killed as well.  If the job is no longer running, it simply returns.
+   * 
+   * @throws IOException
    */
   public void killJob() throws IOException;
     
-  public TaskCompletionEvent[] getTaskCompletionEvents(
-                                                       int startFrom) throws IOException;
+  /**
+   * Get events indicating completion (success/failure) of component tasks.
+   *  
+   * @param startFrom index to start fetching events from
+   * @return an array of {@link TaskCompletionEvent}s
+   * @throws IOException
+   */
+  public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) 
+  throws IOException;
   
   /**
    * Kill indicated task attempt.
-   * @param taskId the id of the task to kill.
-   * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
-   * it is just killed, w/o affecting job failure status.  
+   * 
+   * @param taskId the id of the task to be terminated.
+   * @param shouldFail if true the task is failed and added to failed tasks 
+   *                   list, otherwise it is just killed, w/o affecting 
+   *                   job failure status.  
+   * @throws IOException
    */
   public void killTask(String taskId, boolean shouldFail) throws IOException;
     
   /**
    * Gets the counters for this job.
+   * 
+   * @return the counters for this job.
+   * @throws IOException
    */
   public Counters getCounters() throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html?rev=588033&r1=588032&r2=588033&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html Wed Oct 24 14:18:51 2007
@@ -1,16 +1,212 @@
 <html>
 <body>
 
-<p>A system for scalable, fault-tolerant, distributed computation over
-large data collections.</p>
+<p>A software framework for easily writing applications which process vast 
+amounts of data (multi-terabyte data-sets) parallelly on large clusters 
+(thousands of nodes) built of commodity hardware in a reliable, fault-tolerant 
+manner.</p>
 
-<p>Applications implement {@link org.apache.hadoop.mapred.Mapper} and
-{@link org.apache.hadoop.mapred.Reducer} interfaces.  These are submitted
-as a {@link org.apache.hadoop.mapred.JobConf} and are applied to data
-stored in a {@link org.apache.hadoop.fs.FileSystem}.</p>
+<p>A Map-Reduce <i>job</i> usually splits the input data-set into independent 
+chunks which processed by <i>map</i> tasks in completely parallel manner, 
+followed by <i>reduce</i> tasks which aggregating their output. Typically both 
+the input and the output of the job are stored in a 
+{@link org.apache.hadoop.fs.FileSystem}. The framework takes care of monitoring 
+tasks and re-executing failed ones. Since, usually, the compute nodes and the 
+storage nodes are the same i.e. Hadoop's Map-Reduce framework and Distributed 
+FileSystem are running on the same set of nodes, tasks are effectively scheduled 
+on the nodes where data is already present, resulting in very high aggregate 
+bandwidth across the cluster.</p>
 
-<p>See <a href="http://labs.google.com/papers/mapreduce.html">Google's
-original Map/Reduce paper</a> for background information.</p>
+<p>The Map-Reduce framework operates exclusively on <tt>&lt;key, value&gt;</tt> 
+pairs i.e. the input to the job is viewed as a set of <tt>&lt;key, value&gt;</tt>
+pairs and the output as another, possibly different, set of 
+<tt>&lt;key, value&gt;</tt> pairs. The <tt>key</tt>s and <tt>value</tt>s have to 
+be serializable as {@link org.apache.hadoop.io.Writable}s and additionally the
+<tt>key</tt>s have to be {@link org.apache.hadoop.io.WritableComparable}s in 
+order to facilitate grouping by the framework.</p>
+
+<p>Data flow:</p>
+<pre>
+                                (input)
+                                <tt>&lt;k1, v1&gt;</tt>
+       
+                                   |
+                                   V
+       
+                                  <b>map</b>
+       
+                                   |
+                                   V
+
+                                <tt>&lt;k2, v2&gt;</tt>
+       
+                                   |
+                                   V
+       
+                                <b>combine</b>
+       
+                                   |
+                                   V
+       
+                                <tt>&lt;k2, v2&gt;</tt>
+       
+                                   |
+                                   V
+       
+                                 <b>reduce</b>
+       
+                                   |
+                                   V
+       
+                                <tt>&lt;k3, v3&gt;</tt>
+                                (output)
+</pre>
+
+<p>Applications typically implement 
+{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)} 
+and
+{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)} 
+methods.  The application-writer also specifies various facets of the job such
+as input and output locations, the <tt>Partitioner</tt>, <tt>InputFormat</tt> 
+&amp; <tt>OutputFormat</tt> implementations to be used etc. as 
+a {@link org.apache.hadoop.mapred.JobConf}. The client program, 
+{@link org.apache.hadoop.mapred.JobClient}, then submits the job to the framework 
+and optionally monitors it.</p>
+
+<p>The framework spawns one map task per 
+{@link org.apache.hadoop.mapred.InputSplit} generated by the 
+{@link org.apache.hadoop.mapred.InputFormat} of the job and calls 
+{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)} 
+with each &lt;key, value&gt; pair read by the 
+{@link org.apache.hadoop.mapred.RecordReader} from the <tt>InputSplit</tt> for 
+the task. The intermediate outputs of the maps are then grouped by <tt>key</tt>s
+and optionally aggregated by <i>combiner</i>. The key space of intermediate 
+outputs are paritioned by the {@link org.apache.hadoop.mapred.Partitioner}, where 
+the number of partitions is exactly the number of reduce tasks for the job.</p>
+
+<p>The reduce tasks fetch the sorted intermediate outputs of the maps, via http, 
+merge the &lt;key, value&gt; pairs and call 
+{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)} 
+for each &lt;key, list of values&gt; pair. The output of the reduce tasks' is 
+stored on the <tt>FileSystem</tt> by the 
+{@link org.apache.hadoop.mapred.RecordWriter} provided by the
+{@link org.apache.hadoop.mapred.OutputFormat} of the job.</p>
+
+<p>Map-Reduce application to perform a distributed <i>grep</i>:</p>
+<pre><tt>
+public class Grep extends Configured implements Tool {
+
+  // <i>map: Search for the pattern specified by 'grep.mapper.regex' &amp;</i>
+  //      <i>'grep.mapper.regex.group'</i>
+
+  class GrepMapper&lt;K extends WritableComparable, Text&gt; 
+  extends MapReduceBase  implements Mapper&lt;K, Text, Text, LongWritable&gt; {
+
+    private Pattern pattern;
+    private int group;
+
+    public void configure(JobConf job) {
+      pattern = Pattern.compile(job.get("grep.mapper.regex"));
+      group = job.getInt("grep.mapper.regex.group", 0);
+    }
+
+    public void map(K key, Text value,
+                    OutputCollector&lt;Text, LongWritable&gt; output,
+                    Reporter reporter)
+    throws IOException {
+      String text = value.toString();
+      Matcher matcher = pattern.matcher(text);
+      while (matcher.find()) {
+        output.collect(new Text(matcher.group(group)), new LongWritable(1));
+      }
+    }
+  }
+
+  // <i>reduce: Count the number of occurrences of the pattern</i>
+
+  class GrepReducer&lt;K extends WritableComparable&gt; extends MapReduceBase
+  implements Reducer&lt;K, LongWritable, K, LongWritable&gt; {
+
+    public void reduce(K key, Iterator&lt;LongWritable&gt; values,
+                       OutputCollector&lt;K, LongWritable&gt; output,
+                       Reporter reporter)
+    throws IOException {
+
+      // sum all values for this key
+      long sum = 0;
+      while (values.hasNext()) {
+        sum += values.next().get();
+      }
+
+      // output sum
+      output.collect(key, new LongWritable(sum));
+    }
+  }
+  
+  public int run(String[] args) throws Exception {
+    if (args.length &lt; 3) {
+      System.out.println("Grep &lt;inDir&gt; &lt;outDir&gt; &lt;regex&gt; [&lt;group&gt;]");
+      ToolRunner.printGenericCommandUsage(System.out);
+      return -1;
+    }
+
+    JobConf grepJob = new JobConf(getConf(), Grep.class);
+    
+    grepJob.setJobName("grep");
+
+    grepJob.setInputPath(new Path(args[0]));
+    grepJob.setOutputPath(args[1]);
+
+    grepJob.setMapperClass(GrepMapper.class);
+    grepJob.setCombinerClass(GrepReducer.class);
+    grepJob.setReducerClass(GrepReducer.class);
+
+    grepJob.set("mapred.mapper.regex", args[2]);
+    if (args.length == 4)
+      grepJob.set("mapred.mapper.regex.group", args[3]);
+
+    grepJob.setOutputFormat(SequenceFileOutputFormat.class);
+    grepJob.setOutputKeyClass(Text.class);
+    grepJob.setOutputValueClass(LongWritable.class);
+
+    JobClient.runJob(grepJob);
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new Grep(), args);
+    System.exit(res);
+  }
+
+}
+</tt></pre>
+
+<p>Notice how the data-flow of the above grep job is very similar to doing the
+same via the unix pipeline:</p>
+
+<pre>
+cat input/*   |   grep   |   sort    |   uniq -c   &gt;   out
+</pre>
+
+<pre>
+      input   |    map   |  shuffle  |   reduce    &gt;   out
+</pre>
+
+<p>Hadoop Map-Reduce applications need not be written in 
+Java<small><sup>TM</sup></small> only. 
+<a href="../streaming/package-summary.html">Hadoop Streaming</a> is a utility
+which allows users to create and run jobs with any executables (e.g. shell 
+utilities) as the mapper and/or the reducer. 
+<a href="pipes/package-summary.html">Hadoop Pipes</a> is a 
+<a href="http://www.swig.org/">SWIG</a>-compatible <em>C++ API</em> to implement
+Map-Reduce applications (non JNI<small><sup>TM</sup></small> based).</p>
+
+<p>See <a href="http://labs.google.com/papers/mapreduce.html">Google's original 
+Map/Reduce paper</a> for background information.</p>
+
+<p><i>Java and JNI are trademarks or registered trademarks of 
+Sun Microsystems, Inc. in the United States and other countries.</i></p>
 
 </body>
 </html>



Mime
View raw message