Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 96989 invoked from network); 24 Oct 2007 21:20:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 24 Oct 2007 21:20:02 -0000 Received: (qmail 90049 invoked by uid 500); 24 Oct 2007 21:19:50 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 90019 invoked by uid 500); 24 Oct 2007 21:19:49 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 90010 invoked by uid 99); 24 Oct 2007 21:19:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Oct 2007 14:19:49 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Oct 2007 21:19:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id EB7721A981A; Wed, 24 Oct 2007 14:18:57 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r588033 [2/3] - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/conf/ src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/... Date: Wed, 24 Oct 2007 21:18:53 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071024211900.EB7721A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Wed Oct 24 14:18:51 2007 @@ -31,6 +31,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; @@ -43,11 +45,63 @@ import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; -/** A map/reduce job configuration. This names the {@link Mapper}, combiner - * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat}, and - * {@link OutputFormat} implementations to be used. It also indicates the set - * of input files, and where the output files should be written. */ +/** + * A map/reduce job configuration. + * + *

JobConf is the primary interface for a user to describe a + * map-reduce job to the Hadoop framework for execution. The framework tries to + * faithfully execute the job as-is described by JobConf, however: + *

    + *
  1. + * Some configuration parameters might have been marked as + * + * final by administrators and hence cannot be altered. + *
  2. + *
  3. + * While some job parameters are straight-forward to set + * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly + * rest of the framework and/or job-configuration and is relatively more + * complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}). + *
  4. + *

+ * + *

JobConf typically specifies the {@link Mapper}, combiner + * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and + * {@link OutputFormat} implementations to be used etc. It also indicates the + * set of input files ({@link #setInputPath(Path)}/{@link #addInputPath(Path)}), + * and where the output files should be written ({@link #setOutputPath(Path)}). + * + *

Optionally JobConf is used to specify other advanced facets + * of the job such as Comparators to be used, files to be put in + * the {@link DistributedCache}, whether or not intermediate and/or job outputs + * are to be compressed (and how) etc.

+ * + *

Here is an example on how to configure a job via JobConf:

+ *

+ *     // Create a new JobConf
+ *     JobConf job = new JobConf(new Configuration(), MyJob.class);
+ *     
+ *     // Specify various job-specific parameters     
+ *     job.setJobName("myjob");
+ *     
+ *     job.setInputPath(new Path("in"));
+ *     job.setOutputPath(new Path("out"));
+ *     
+ *     job.setMapperClass(MyJob.MyMapper.class);
+ *     job.setCombinerClass(MyJob.MyReducer.class);
+ *     job.setReducerClass(MyJob.MyReducer.class);
+ *     
+ *     job.setInputFormat(SequenceFileInputFormat.class);
+ *     job.setOutputFormat(SequenceFileOutputFormat.class);
+ * 

+ * + * @see JobClient + * @see ClusterStatus + * @see Tool + * @see DistributedCache + */ public class JobConf extends Configuration { private static final Log LOG = LogFactory.getLog(JobConf.class); @@ -61,6 +115,7 @@ /** * Construct a map/reduce job configuration. + * * @param exampleClass a class whose containing jar is used as the job's jar. */ public JobConf(Class exampleClass) { @@ -92,7 +147,7 @@ /** Construct a map/reduce configuration. * - * @param config a Configuration-format XML job description file + * @param config a Configuration-format XML job description file. */ public JobConf(String config) { this(new Path(config)); @@ -100,7 +155,7 @@ /** Construct a map/reduce configuration. * - * @param config a Configuration-format XML job description file + * @param config a Configuration-format XML job description file. */ public JobConf(Path config) { super(); @@ -111,6 +166,7 @@ /** * Checks if mapred-default.xml is on the CLASSPATH, if so * it warns the user and loads it as a {@link Configuration} resource. + * * @deprecated Remove in hadoop-0.16.0 via HADOOP-1843 */ private void checkWarnAndLoadMapredDefault() { @@ -122,12 +178,24 @@ } } + /** + * Get the user jar for the map-reduce job. + * + * @return the user jar for the map-reduce job. + */ public String getJar() { return get("mapred.jar"); } + + /** + * Set the user jar for the map-reduce job. + * + * @param jar the user jar for the map-reduce job. + */ public void setJar(String jar) { set("mapred.jar", jar); } /** * Set the job's jar file by finding an example class location. - * @param cls the example class + * + * @param cls the example class. */ public void setJarByClass(Class cls) { String jar = findContainingJar(cls); @@ -136,6 +204,11 @@ } } + /** + * Get the system directory where job-specific files are to be placed. + * + * @return the system directory where job-specific files are to be placed. + */ public Path getSystemDir() { return new Path(get("mapred.system.dir", "/tmp/hadoop/mapred/system")); } @@ -158,23 +231,41 @@ } } - /** Constructs a local file name. Files are distributed among configured - * local directories.*/ + /** + * Constructs a local file name. Files are distributed among configured + * local directories. + */ public Path getLocalPath(String pathString) throws IOException { return getLocalPath("mapred.local.dir", pathString); } + /** + * Set the {@link Path} of the input directory for the map-reduce job. + * + * @param dir the {@link Path} of the input directory for the map-reduce job. + */ public void setInputPath(Path dir) { dir = new Path(getWorkingDirectory(), dir); set("mapred.input.dir", dir.toString()); } + /** + * Add a {@link Path} to the list of inputs for the map-reduce job. + * + * @param dir {@link Path} to be added to the list of inputs for + * the map-reduce job. + */ public void addInputPath(Path dir) { dir = new Path(getWorkingDirectory(), dir); String dirs = get("mapred.input.dir"); set("mapred.input.dir", dirs == null ? dir.toString() : dirs + "," + dir); } + /** + * Get the list of input {@link Path}s for the map-reduce job. + * + * @return the list of input {@link Path}s for the map-reduce job. + */ public Path[] getInputPaths() { String dirs = get("mapred.input.dir", ""); ArrayList list = Collections.list(new StringTokenizer(dirs, ",")); @@ -187,6 +278,7 @@ /** * Get the reported username for this job. + * * @return the username */ public String getUser() { @@ -195,7 +287,8 @@ /** * Set the reported username for this job. - * @param user the username + * + * @param user the username for this job. */ public void setUser(String user) { set("user.name", user); @@ -206,6 +299,10 @@ /** * Set whether the framework should keep the intermediate files for * failed tasks. + * + * @param keep true if framework should keep the intermediate files + * for failed tasks, false otherwise. + * */ public void setKeepFailedTaskFiles(boolean keep) { setBoolean("keep.failed.task.files", keep); @@ -213,6 +310,7 @@ /** * Should the temporary files for failed tasks be kept? + * * @return should the files be kept? */ public boolean getKeepFailedTaskFiles() { @@ -223,6 +321,7 @@ * Set a regular expression for task names that should be kept. * The regular expression ".*_m_000123_0" would keep the files * for the first instance of map 123 that ran. + * * @param pattern the java.util.regex.Pattern to match against the * task names. */ @@ -233,15 +332,17 @@ /** * Get the regular expression that is matched against the task names * to see if we need to keep the files. - * @return the pattern as a string, if it was set, othewise null + * + * @return the pattern as a string, if it was set, othewise null. */ public String getKeepTaskFilesPattern() { return get("keep.task.files.pattern"); } /** - * Set the current working directory for the default file system - * @param dir the new current working directory + * Set the current working directory for the default file system. + * + * @param dir the new current working directory. */ public void setWorkingDirectory(Path dir) { dir = new Path(getWorkingDirectory(), dir); @@ -250,7 +351,8 @@ /** * Get the current working directory for the default file system. - * @return the directory name + * + * @return the directory name. */ public Path getWorkingDirectory() { String name = get("mapred.working.dir"); @@ -267,31 +369,107 @@ } } + /** + * Get the {@link Path} to the output directory for the map-reduce job. + * + *

Tasks' Side-Effect Files

+ * + *

Some applications need to create/write-to side-files, which differ from + * the actual job-outputs. + * + *

In such cases there could be issues with 2 instances of the same TIP + * (running simultaneously e.g. speculative tasks) trying to open/write-to the + * same file (path) on HDFS. Hence the application-writer will have to pick + * unique names per task-attempt (e.g. using the taskid, say + * task_200709221812_0001_m_000000_0), not just per TIP.

+ * + *

To get around this the Map-Reduce framework helps the application-writer + * out by maintaining a special ${mapred.output.dir}/_${taskid} + * sub-directory for each task-attempt on HDFS where the output of the + * task-attempt goes. On successful completion of the task-attempt the files + * in the ${mapred.output.dir}/_${taskid} (only) + * are promoted to ${mapred.output.dir}. Of course, the + * framework discards the sub-directory of unsuccessful task-attempts. This + * is completely transparent to the application.

+ * + *

The application-writer can take advantage of this by creating any + * side-files required in ${mapred.output.dir} during execution of his + * reduce-task i.e. via {@link #getOutputPath()}, and the framework will move + * them out similarly - thus she doesn't have to pick unique paths per + * task-attempt.

+ * + *

Note: the value of ${mapred.output.dir} during execution + * of a particular task-attempt is actually + * ${mapred.output.dir}/_{$taskid}, not the value set by + * {@link #setOutputPath(Path)}. So, just create any side-files in the path + * returned by {@link #getOutputPath()} from map/reduce task to take + * advantage of this feature.

+ * + *

The entire discussion holds true for maps of jobs with + * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, + * goes directly to HDFS.

+ * + * @return the {@link Path} to the output directory for the map-reduce job. + */ public Path getOutputPath() { String name = get("mapred.output.dir"); return name == null ? null: new Path(name); } + /** + * Set the {@link Path} of the output directory for the map-reduce job. + * + *

Note: + *

+ * @param dir the {@link Path} of the output directory for the map-reduce job. + */ public void setOutputPath(Path dir) { dir = new Path(getWorkingDirectory(), dir); set("mapred.output.dir", dir.toString()); } + /** + * Get the {@link InputFormat} implementation for the map-reduce job, + * defaults to {@link TextInputFormat} if not specified explicity. + * + * @return the {@link InputFormat} implementation for the map-reduce job. + */ public InputFormat getInputFormat() { return (InputFormat)ReflectionUtils.newInstance(getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), this); } + + /** + * Set the {@link InputFormat} implementation for the map-reduce job. + * + * @param theClass the {@link InputFormat} implementation for the map-reduce + * job. + */ public void setInputFormat(Class theClass) { setClass("mapred.input.format.class", theClass, InputFormat.class); } + + /** + * Get the {@link OutputFormat} implementation for the map-reduce job, + * defaults to {@link TextOutputFormat} if not specified explicity. + * + * @return the {@link OutputFormat} implementation for the map-reduce job. + */ public OutputFormat getOutputFormat() { return (OutputFormat)ReflectionUtils.newInstance(getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), this); } + + /** + * Set the {@link OutputFormat} implementation for the map-reduce job. + * + * @param theClass the {@link OutputFormat} implementation for the map-reduce + * job. + */ public void setOutputFormat(Class theClass) { setClass("mapred.output.format.class", theClass, OutputFormat.class); } @@ -320,6 +498,7 @@ /** * Should the map outputs be compressed before transfer? * Uses the SequenceFile compression. + * * @param compress should the map outputs be compressed? */ public void setCompressMapOutput(boolean compress) { @@ -328,8 +507,9 @@ /** * Are the outputs of the maps be compressed? + * * @return true if the outputs of the maps are to be compressed, - * false otherwise + * false otherwise. */ public boolean getCompressMapOutput() { return getBoolean("mapred.compress.map.output", false); @@ -337,8 +517,9 @@ /** * Set the {@link CompressionType} for the map outputs. + * * @param style the {@link CompressionType} to control how the map outputs - * are compressed + * are compressed. */ public void setMapOutputCompressionType(CompressionType style) { set("mapred.map.output.compression.type", style.toString()); @@ -346,8 +527,9 @@ /** * Get the {@link CompressionType} for the map outputs. + * * @return the {@link CompressionType} for map outputs, defaulting to - * {@link CompressionType#RECORD} + * {@link CompressionType#RECORD}. */ public CompressionType getMapOutputCompressionType() { String val = get("mapred.map.output.compression.type", @@ -357,8 +539,9 @@ /** * Set the given class as the {@link CompressionCodec} for the map outputs. + * * @param codecClass the {@link CompressionCodec} class that will compress - * the map outputs + * the map outputs. */ public void setMapOutputCompressorClass(Class codecClass) { @@ -368,9 +551,10 @@ /** * Get the {@link CompressionCodec} for compressing the map outputs. + * * @param defaultValue the {@link CompressionCodec} to return if not set * @return the {@link CompressionCodec} class that should be used to compress the - * map outputs + * map outputs. * @throws IllegalArgumentException if the class was specified, but not found */ public Class @@ -390,10 +574,10 @@ /** * Get the key class for the map output data. If it is not set, use the - * (final) output ket class This allows the map output key class to be - * different than the final output key class - * - * @return map output key class + * (final) output key class. This allows the map output key class to be + * different than the final output key class. + * + * @return the map output key class. */ public Class getMapOutputKeyClass() { Class retv = getClass("mapred.mapoutput.key.class", null, @@ -407,7 +591,9 @@ /** * Set the key class for the map output data. This allows the user to * specify the map output key class to be different than the final output - * value class + * value class. + * + * @param theClass the map output key class. */ public void setMapOutputKeyClass(Class theClass) { setClass("mapred.mapoutput.key.class", theClass, @@ -417,9 +603,9 @@ /** * Get the value class for the map output data. If it is not set, use the * (final) output value class This allows the map output value class to be - * different than the final output value class - * - * @return map output value class + * different than the final output value class. + * + * @return the map output value class. */ public Class getMapOutputValueClass() { Class retv = getClass("mapred.mapoutput.value.class", null, @@ -433,21 +619,38 @@ /** * Set the value class for the map output data. This allows the user to * specify the map output value class to be different than the final output - * value class + * value class. + * + * @param theClass the map output value class. */ public void setMapOutputValueClass(Class theClass) { setClass("mapred.mapoutput.value.class", theClass, Writable.class); } + /** + * Get the key class for the job output data. + * + * @return the key class for the job output data. + */ public Class getOutputKeyClass() { return getClass("mapred.output.key.class", LongWritable.class, WritableComparable.class); } + /** + * Set the key class for the job output data. + * + * @param theClass the key class for the job output data. + */ public void setOutputKeyClass(Class theClass) { setClass("mapred.output.key.class", theClass, WritableComparable.class); } + /** + * Get the {@link WritableComparable} comparator used to compare keys. + * + * @return the {@link WritableComparable} comparator used to compare keys. + */ public WritableComparator getOutputKeyComparator() { Class theClass = getClass("mapred.output.key.comparator.class", null, WritableComparator.class); @@ -456,17 +659,24 @@ return WritableComparator.get(getMapOutputKeyClass()); } + /** + * Set the {@link WritableComparable} comparator used to compare keys. + * + * @param theClass the {@link WritableComparable} comparator used to + * compare keys. + * @see #setOutputValueGroupingComparator(Class) + */ public void setOutputKeyComparatorClass(Class theClass) { setClass("mapred.output.key.comparator.class", theClass, WritableComparator.class); } - /** Get the user defined comparator for grouping values. + /** + * Get the user defined {@link WritableComparable} comparator for + * grouping keys of inputs to the reduce. * - * This call is used to get the comparator for grouping values by key. - * @see #setOutputValueGroupingComparator(Class) for details. - * - * @return Comparator set by the user for grouping values. + * @return comparator set by the user for grouping values. + * @see #setOutputValueGroupingComparator(Class) for details. */ public WritableComparator getOutputValueGroupingComparator() { Class theClass = getClass("mapred.output.value.groupfn.class", null, @@ -478,120 +688,310 @@ return (WritableComparator)ReflectionUtils.newInstance(theClass, this); } - /** Set the user defined comparator for grouping values. + /** + * Set the user defined {@link WritableComparable} comparator for + * grouping keys in the input to the reduce. * - * For key-value pairs (K1,V1) and (K2,V2), the values are passed - * in a single call to the map function if K1 and K2 compare as equal. + *

This comparator should be provided if the equivalence rules for keys + * for sorting the intermediates are different from those for grouping keys + * before each call to + * {@link Reducer#reduce(WritableComparable, java.util.Iterator, OutputCollector, Reporter)}.

+ * + *

For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed + * in a single call to the reduce function if K1 and K2 compare as equal.

* - * This comparator should be provided if the equivalence rules for keys - * for sorting the intermediates are different from those for grouping - * values. + *

Since {@link #setOutputKeyComparatorClass(Class)} can be used to control + * how keys are sorted, this can be used in conjunction to simulate + * secondary sort on values.

+ * + *

Note: This is not a guarantee of the reduce sort being + * stable in any sense. (In any case, with the order of available + * map-outputs to the reduce being non-deterministic, it wouldn't make + * that much sense.)

* - * @param theClass The Comparator class to be used for grouping. It should - * extend WritableComparator. + * @param theClass the comparator class to be used for grouping keys. + * It should extend WritableComparator. + * @see #setOutputKeyComparatorClass(Class) */ public void setOutputValueGroupingComparator(Class theClass) { setClass("mapred.output.value.groupfn.class", theClass, WritableComparator.class); } + /** + * Get the value class for job outputs. + * + * @return the value class for job outputs. + */ public Class getOutputValueClass() { return getClass("mapred.output.value.class", Text.class, Writable.class); } + + /** + * Set the value class for job outputs. + * + * @param theClass the value class for job outputs. + */ public void setOutputValueClass(Class theClass) { setClass("mapred.output.value.class", theClass, Writable.class); } - + /** + * Get the {@link Mapper} class for the job. + * + * @return the {@link Mapper} class for the job. + */ public Class getMapperClass() { return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class); } + + /** + * Set the {@link Mapper} class for the job. + * + * @param theClass the {@link Mapper} class for the job. + */ public void setMapperClass(Class theClass) { setClass("mapred.mapper.class", theClass, Mapper.class); } + /** + * Get the {@link MapRunnable} class for the job. + * + * @return the {@link MapRunnable} class for the job. + */ public Class getMapRunnerClass() { return getClass("mapred.map.runner.class", MapRunner.class, MapRunnable.class); } + + /** + * Expert: Set the {@link MapRunnable} class for the job. + * + * Typically used to exert greater control on {@link Mapper}s. + * + * @param theClass the {@link MapRunnable} class for the job. + */ public void setMapRunnerClass(Class theClass) { setClass("mapred.map.runner.class", theClass, MapRunnable.class); } + /** + * Get the {@link Partitioner} used to partition {@link Mapper}-outputs + * to be sent to the {@link Reducer}s. + * + * @return the {@link Partitioner} used to partition map-outputs. + */ public Class getPartitionerClass() { return getClass("mapred.partitioner.class", HashPartitioner.class, Partitioner.class); } + + /** + * Set the {@link Partitioner} class used to partition + * {@link Mapper}-outputs to be sent to the {@link Reducer}s. + * + * @param theClass the {@link Partitioner} used to partition map-outputs. + */ public void setPartitionerClass(Class theClass) { setClass("mapred.partitioner.class", theClass, Partitioner.class); } + /** + * Get the {@link Reducer} class for the job. + * + * @return the {@link Reducer} class for the job. + */ public Class getReducerClass() { return getClass("mapred.reducer.class", IdentityReducer.class, Reducer.class); } + + /** + * Set the {@link Reducer} class for the job. + * + * @param theClass the {@link Reducer} class for the job. + */ public void setReducerClass(Class theClass) { setClass("mapred.reducer.class", theClass, Reducer.class); } + /** + * Get the user-defined combiner class used to combine map-outputs + * before being sent to the reducers. Typically the combiner is same as the + * the {@link Reducer} for the job i.e. {@link #getReducerClass()}. + * + * @return the user-defined combiner class used to combine map-outputs. + */ public Class getCombinerClass() { return getClass("mapred.combiner.class", null, Reducer.class); } + + /** + * Set the user-defined combiner class used to combine map-outputs + * before being sent to the reducers. + * + *

The combiner is a task-level aggregation operation which, in some cases, + * helps to cut down the amount of data transferred from the {@link Mapper} to + * the {@link Reducer}, leading to better performance.

+ * + *

Typically the combiner is same as the the Reducer for the + * job i.e. {@link #setReducerClass(Class)}.

+ * + * @param theClass the user-defined combiner class used to combine + * map-outputs. + */ public void setCombinerClass(Class theClass) { setClass("mapred.combiner.class", theClass, Reducer.class); } /** - * Should speculative execution be used for this job? - * @return Defaults to true + * Should speculative execution be used for this job? + * Defaults to true. + * + * @return true if speculative execution be used for this job, + * false otherwise. */ public boolean getSpeculativeExecution() { return getBoolean("mapred.speculative.execution", true); } /** - * Turn on or off speculative execution for this job. - * In general, it should be turned off for map jobs that have side effects. + * Turn speculative execution on or off for this job. + * + * @param speculativeExecution true if speculative execution + * should be turned on, else false. */ - public void setSpeculativeExecution(boolean new_val) { - setBoolean("mapred.speculative.execution", new_val); + public void setSpeculativeExecution(boolean speculativeExecution) { + setBoolean("mapred.speculative.execution", speculativeExecution); } + /** + * Get configured the number of reduce tasks for this job. + * Defaults to 1. + * + * @return the number of reduce tasks for this job. + */ public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); } + + /** + * Set the number of map tasks for this job. + * + *

Note: This is only a hint to the framework. The actual + * number of spawned map tasks depends on the number of {@link InputSplit}s + * generated by the job's {@link InputFormat#getSplits(JobConf, int)}. + * + * A custom {@link InputFormat} is typically used to accurately control + * the number of map tasks for the job.

+ * + *

How many maps?

+ * + *

The number of maps is usually driven by the total size of the inputs + * i.e. total number of blocks of the input files.

+ * + *

The right level of parallelism for maps seems to be around 10-100 maps + * per-node, although it has been set up to 300 or so for very cpu-light map + * tasks. Task setup takes awhile, so it is best if the maps take at least a + * minute to execute.

+ * + *

The default behavior of file-based {@link InputFormat}s is to split the + * input into logical {@link InputSplit}s based on the total size, in + * bytes, of input files. However, the {@link FileSystem} blocksize of the + * input files is treated as an upper bound for input splits. A lower bound + * on the split size can be set via + * + * mapred.min.split.size.

+ * + *

Thus, if you expect 10TB of input data and have a blocksize of 128MB, + * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is + * used to set it even higher.

+ * + * @param n the number of map tasks for this job. + * @see InputFormat#getSplits(JobConf, int) + * @see FileInputFormat + * @see FileSystem#getDefaultBlockSize() + * @see FileStatus#getBlockSize() + */ public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); } + /** + * Get configured the number of reduce tasks for this job. Defaults to + * 1. + * + * @return the number of reduce tasks for this job. + */ public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 1); } + + /** + * Set the requisite number of reduce tasks for this job. + * + *

How many reduces?

+ * + *

The right number of reduces seems to be 0.95 or + * 1.75 multiplied by (<no. of nodes> * + * + * mapred.tasktracker.tasks.maximum). + *

+ * + *

With 0.95 all of the reduces can launch immediately and + * start transfering map outputs as the maps finish. With 1.75 + * the faster nodes will finish their first round of reduces and launch a + * second wave of reduces doing a much better job of load balancing.

+ * + *

Increasing the number of reduces increases the framework overhead, but + * increases load balancing and lowers the cost of failures.

+ * + *

The scaling factors above are slightly less than whole numbers to + * reserve a few reduce slots in the framework for speculative-tasks, failures + * etc.

+ * + *

Reducer NONE

+ * + *

It is legal to set the number of reduce-tasks to zero.

+ * + *

In this case the output of the map-tasks directly go to distributed + * file-system, to the path set by {@link #setOutputPath(Path)}. Also, the + * framework doesn't sort the map-outputs before writing it out to HDFS.

+ * + * @param n the number of reduce tasks for this job. + */ public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); } - /** Get the configured number of maximum attempts that will be made to run a - * map task, as specified by the mapred.map.max.attempts - * property. If this property is not already set, the default is 4 attempts - * @return the max number of attempts + /** + * Get the configured number of maximum attempts that will be made to run a + * map task, as specified by the mapred.map.max.attempts + * property. If this property is not already set, the default is 4 attempts. + * + * @return the max number of attempts per map task. */ public int getMaxMapAttempts() { return getInt("mapred.map.max.attempts", 4); } - /** Expert: Set the number of maximum attempts that will be made to run a - * map task - * @param n the number of attempts - * + + /** + * Expert: Set the number of maximum attempts that will be made to run a + * map task. + * + * @param n the number of attempts per map task. */ public void setMaxMapAttempts(int n) { setInt("mapred.map.max.attempts", n); } - /** Get the configured number of maximum attempts that will be made to run a - * reduce task, as specified by the mapred.reduce.max.attempts - * property. If this property is not already set, the default is 4 attempts - * @return the max number of attempts + /** + * Get the configured number of maximum attempts that will be made to run a + * reduce task, as specified by the mapred.reduce.max.attempts + * property. If this property is not already set, the default is 4 attempts. + * + * @return the max number of attempts per reduce task. */ public int getMaxReduceAttempts() { return getInt("mapred.reduce.max.attempts", 4); } - /** Expert: Set the number of maximum attempts that will be made to run a - * reduce task - * @param n the number of attempts - * + /** + * Expert: Set the number of maximum attempts that will be made to run a + * reduce task. + * + * @param n the number of attempts per reduce task. */ public void setMaxReduceAttempts(int n) { setInt("mapred.reduce.max.attempts", n); @@ -600,7 +1000,8 @@ /** * Get the user-specified job name. This is only used to identify the * job to the user. - * @return the job's name, defaulting to "" + * + * @return the job's name, defaulting to "". */ public String getJobName() { return get("mapred.job.name", ""); @@ -608,7 +1009,8 @@ /** * Set the user-specified job name. - * @param name the job's new name + * + * @param name the job's new name. */ public void setJobName(String name) { set("mapred.job.name", name); @@ -627,16 +1029,16 @@ * When not running under HOD, this identifer is expected to remain set to * the empty string. * - * @return the session identifier, defaulting to "" + * @return the session identifier, defaulting to "". */ public String getSessionId() { return get("session.id", ""); } /** - * Set the user-specified session idengifier. + * Set the user-specified session identifier. * - * @param sessionId the new session id + * @param sessionId the new session id. */ public void setSessionId(String sessionId) { set("session.id", sessionId); @@ -644,6 +1046,8 @@ /** * Set the maximum no. of failures of a given job per tasktracker. + * If the no. of task failures exceeds noFailures, the + * tasktracker is blacklisted for this job. * * @param noFailures maximum no. of failures of a given job per tasktracker. */ @@ -652,7 +1056,9 @@ } /** - * Get the maximum no. of failures of a given job per tasktracker. + * Expert: Get the maximum no. of failures of a given job per tasktracker. + * If the no. of task failures exceeds this, the tasktracker is + * blacklisted for this job. * * @return the maximum no. of failures of a given job per tasktracker. */ @@ -662,21 +1068,30 @@ /** * Get the maximum percentage of map tasks that can fail without - * the job being aborted. + * the job being aborted. + * + * Each map task is executed a minimum of {@link #getMaxMapAttempts()} + * attempts before being declared as failed. + * + * Defaults to zero, i.e. any failed map-task results in + * the job being declared as {@link JobStatus#FAILED}. * * @return the maximum percentage of map tasks that can fail without - * the job being aborted + * the job being aborted. */ public int getMaxMapTaskFailuresPercent() { return getInt("mapred.max.map.failures.percent", 0); } /** - * Set the maximum percentage of map tasks that can fail without the job - * being aborted. + * Expert: Set the maximum percentage of map tasks that can fail without the + * job being aborted. + * + * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts + * before being declared as failed. * * @param percent the maximum percentage of map tasks that can fail without - * the job being aborted + * the job being aborted. */ public void setMaxMapTaskFailuresPercent(int percent) { setInt("mapred.max.map.failures.percent", percent); @@ -684,10 +1099,16 @@ /** * Get the maximum percentage of reduce tasks that can fail without - * the job being aborted. + * the job being aborted. + * + * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} + * attempts before being declared as failed. + * + * Defaults to zero, i.e. any failed reduce-task results + * in the job being declared as {@link JobStatus#FAILED}. * * @return the maximum percentage of reduce tasks that can fail without - * the job being aborted + * the job being aborted. */ public int getMaxReduceTaskFailuresPercent() { return getInt("mapred.max.reduce.failures.percent", 0); @@ -697,24 +1118,29 @@ * Set the maximum percentage of reduce tasks that can fail without the job * being aborted. * + * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} + * attempts before being declared as failed. + * * @param percent the maximum percentage of reduce tasks that can fail without - * the job being aborted + * the job being aborted. */ public void setMaxReduceTaskFailuresPercent(int percent) { setInt("mapred.max.reduce.failures.percent", percent); } /** - * Set job priority for this job. + * Set {@link JobPriority} for this job. * - * @param prio + * @param prio the {@link JobPriority} for this job. */ public void setJobPriority(JobPriority prio) { set("mapred.job.priority", prio.toString()); } /** - * Get the job priority for this job. + * Get the {@link JobPriority} for this job. + * + * @return the {@link JobPriority} for this job. */ public JobPriority getJobPriority() { String prio = get("mapred.job.priority"); @@ -725,12 +1151,44 @@ return JobPriority.valueOf(prio); } + /** + * Get the uri to be invoked in-order to send a notification after the job + * has completed (success/failure). + * + * @return the job end notification uri, null if it hasn't + * been set. + * @see #setJobEndNotificationURI(String) + */ + public String getJobEndNotificationURI() { + return get("job.end.notification.url"); + } + + /** + * Set the uri to be invoked in-order to send a notification after the job + * has completed (success/failure). + * + *

The uri can contain 2 special parameters: $jobId and + * $jobStatus. Those, if present, are replaced by the job's + * identifier and completion-status respectively.

+ * + *

This is typically used by application-writers to implement chaining of + * Map-Reduce jobs in an asynchronous manner.

+ * + * @param uri the job end notification uri + * @see JobStatus + * @see Job Completion and Chaining + */ + public void setJobEndNotificationURI(String uri) { + set("job.end.notification.url", uri); + } - /** Find a jar that contains a class of the same name, if any. + /** + * Find a jar that contains a class of the same name, if any. * It will return a jar file, even if that is not the first thing * on the class path that has a class with the same name. - * @param my_class the class to find - * @return a jar file that contains the class, or null + * + * @param my_class the class to find. + * @return a jar file that contains the class, or null. * @throws IOException */ private static String findContainingJar(Class my_class) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java Wed Oct 24 14:18:51 2007 @@ -95,7 +95,7 @@ private static JobEndStatusInfo createNotification(JobConf conf, JobStatus status) { JobEndStatusInfo notification = null; - String uri = conf.get("job.end.notification.url"); + String uri = conf.getJobEndNotificationURI(); if (uri != null) { // +1 to make logic for first notification identical to a retry int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java Wed Oct 24 14:18:51 2007 @@ -23,8 +23,11 @@ import org.apache.hadoop.io.Closeable; import org.apache.hadoop.mapred.JobConfigurable; -/** Base class for {@link Mapper} and {@link Reducer} implementations. - * Provides default implementations for a few methods. +/** + * Base class for {@link Mapper} and {@link Reducer} implementations. + * + *

Provides default no-op implementations for a few methods, most non-trivial + * applications need to override some of them.

*/ public class MapReduceBase implements Closeable, JobConfigurable { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java Wed Oct 24 14:18:51 2007 @@ -23,15 +23,28 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -/** Expert: Permits greater control of map processing. For example, - * implementations might perform multi-threaded, asynchronous mappings. */ +/** + * Expert: Generic interface for {@link Mapper}s. + * + *

Custom implementations of MapRunnable can exert greater + * control on map processing e.g. multi-threaded, asynchronous mappers etc.

+ * + * @see Mapper + */ public interface MapRunnable extends JobConfigurable { - /** Called to execute mapping. Mapping is complete when this returns. - * @param input the {@link RecordReader} with input key/value pairs. - * @param output the {@link OutputCollector} for mapped key/value pairs. + /** + * Start mapping input <key, value> pairs. + * + *

Mapping of input records to output records is complete when this method + * returns.

+ * + * @param input the {@link RecordReader} to read the input records. + * @param output the {@link OutputCollector} to collect the outputrecords. + * @param reporter {@link Reporter} to report progress, status-updates etc. + * @throws IOException */ void run(RecordReader input, OutputCollector output, Reporter reporter) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java Wed Oct 24 14:18:51 2007 @@ -20,29 +20,141 @@ import java.io.IOException; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Closeable; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.compress.CompressionCodec; -/** Maps input key/value pairs to a set of intermediate key/value pairs. All - * intermediate values associated with a given output key are subsequently - * grouped by the map/reduce system, and passed to a {@link Reducer} to - * determine the final output.. */ +/** + * Maps input key/value pairs to a set of intermediate key/value pairs. + * + *

Maps are the individual tasks which transform input records into a + * intermediate records. The transformed intermediate records need not be of + * the same type as the input records. A given input pair may map to zero or + * many output pairs.

+ * + *

The Hadoop Map-Reduce framework spawns one map task for each + * {@link InputSplit} generated by the {@link InputFormat} for the job. + * Mapper implementations can access the {@link JobConf} for the + * job via the {@link JobConfigurable#configure(JobConf)} and initialize + * themselves. Similarly they can use the {@link Closeable#close()} method for + * de-initialization.

+ * + *

The framework then calls + * {@link #map(WritableComparable, Writable, OutputCollector, Reporter)} + * for each key/value pair in the InputSplit for that task.

+ * + *

All intermediate values associated with a given output key are + * subsequently grouped by the framework, and passed to a {@link Reducer} to + * determine the final output. Users can control the grouping by specifying + * a Comparator via + * {@link JobConf#setOutputKeyComparatorClass(Class)}.

+ * + *

The grouped Mapper outputs are partitioned per + * Reducer. Users can control which keys (and hence records) go to + * which Reducer by implementing a custom {@link Partitioner}. + * + *

Users can optionally specify a combiner, via + * {@link JobConf#setCombinerClass(Class)}, to perform local aggregation of the + * intermediate outputs, which helps to cut down the amount of data transferred + * from the Mapper to the Reducer. + * + *

The intermediate, grouped outputs are always stored in + * {@link SequenceFile}s. Applications can specify if and how the intermediate + * outputs are to be compressed and which {@link CompressionCodec}s are to be + * used via the JobConf.

+ * + *

If the job has + * zero + * reduces then the output of the Mapper is directly written + * to the {@link FileSystem} without grouping by keys.

+ * + *

Example:

+ *

+ *     public class MyMapper<K extends WritableComparable, V extends Writable> 
+ *     extends MapReduceBase implements Mapper<K, V, K, V> {
+ *     
+ *       static enum MyCounters { NUM_RECORDS }
+ *       
+ *       private String mapTaskId;
+ *       private String inputFile;
+ *       private int noRecords = 0;
+ *       
+ *       public void configure(JobConf job) {
+ *         mapTaskId = job.get("mapred.task.id");
+ *         inputFile = job.get("mapred.input.file");
+ *       }
+ *       
+ *       public void map(K key, V val,
+ *                       OutputCollector<K, V> output, Reporter reporter)
+ *       throws IOException {
+ *         // Process the <key, value> pair (assume this takes a while)
+ *         // ...
+ *         // ...
+ *         
+ *         // Let the framework know that we are alive, and kicking!
+ *         // reporter.progress();
+ *         
+ *         // Process some more
+ *         // ...
+ *         // ...
+ *         
+ *         // Increment the no. of <key, value> pairs processed
+ *         ++noRecords;
+ *
+ *         // Increment counters
+ *         reporter.incrCounter(NUM_RECORDS, 1);
+ *        
+ *         // Every 100 records update application-level status
+ *         if ((noRecords%100) == 0) {
+ *           reporter.setStatus(mapTaskId + " processed " + noRecords + 
+ *                              " from input-file: " + inputFile); 
+ *         }
+ *         
+ *         // Output the result
+ *         output.collect(key, val);
+ *       }
+ *     }
+ *
+ * 

Applications may write a custom {@link MapRunnable} to exert greater + * control on map processing e.g. multi-threaded Mappers etc.

+ * + * @see JobConf + * @see InputFormat + * @see Partitioner + * @see Reducer + * @see MapReduceBase + * @see MapRunnable + * @see SequenceFile + */ public interface Mapper extends JobConfigurable, Closeable { - /** Maps a single input key/value pair into intermediate key/value pairs. - * Output pairs need not be of the same types as input pairs. A given input - * pair may map to zero or many output pairs. Output pairs are collected - * with calls to {@link - * OutputCollector#collect(WritableComparable,Writable)}. + /** + * Maps a single input key/value pair into an intermediate key/value pair. + * + *

Output pairs need not be of the same types as input pairs. A given + * input pair may map to zero or many output pairs. Output pairs are + * collected with calls to + * {@link OutputCollector#collect(WritableComparable,Writable)}.

* - * @param key the key - * @param value the values - * @param output collects mapped keys and values + *

Applications can use the {@link Reporter} provided to report progress + * or just indicate that they are alive. In scenarios where the application + * takes an insignificant amount of time to process individual key/value + * pairs, this is crucial since the framework might assume that the task has + * timed-out and kill that task. The other way of avoiding this is to set + * + * mapred.task.timeout to a high-enough value (or even zero for no + * time-outs).

+ * + * @param key the input key. + * @param value the input value. + * @param output collects mapped keys and values. + * @param reporter facility to report progress. */ - void map(K1 key, V1 value, - OutputCollector output, Reporter reporter) - throws IOException; + void map(K1 key, V1 value, OutputCollector output, Reporter reporter) + throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java Wed Oct 24 14:18:51 2007 @@ -24,15 +24,23 @@ import org.apache.hadoop.io.WritableComparable; -/** Passed to {@link Mapper} and {@link Reducer} implementations to collect - * output data. */ +/** + * Collects the <key, value> pairs output by {@link Mapper}s + * and {@link Reducer}s. + * + *

OutputCollector is the generalization of the facility + * provided by the Map-Reduce framework to collect data output by either the + * Mapper or the Reducer i.e. intermediate outputs + * or the output of the job.

+ */ public interface OutputCollector { /** Adds a key/value pair to the output. * - * @param key the key to add - * @param value to value to add + * @param key the key to collect. + * @param value to value to collect. + * @throws IOException */ void collect(K key, V value) throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java Wed Oct 24 14:18:51 2007 @@ -25,28 +25,53 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.Progressable; -/** An output data format. Output files are stored in a {@link - * FileSystem}. */ +/** + * OutputFormat describes the output-specification for a + * Map-Reduce job. + * + *

The Map-Reduce framework relies on the OutputFormat of the + * job to:

+ *

    + *
  1. + * Validate the output-specification of the job. For e.g. check that the + * output directory doesn't already exist. + *
  2. + * Provide the {@link RecordWriter} implementation to be used to write out + * the output files of the job. Output files are stored in a + * {@link FileSystem}. + *
  3. + *
+ * + * @see RecordWriter + * @see JobConf + */ public interface OutputFormat { - /** Construct a {@link RecordWriter} with Progressable. + /** + * Get the {@link RecordWriter} for the given job. * - * @param job the job whose output is being written - * @param name the unique name for this part of the output - * @param progress mechanism for reporting progress while writing to file - * @return a {@link RecordWriter} + * @param ignored + * @param job configuration for the job whose output is being written. + * @param name the unique name for this part of the output. + * @param progress mechanism for reporting progress while writing to file. + * @return a {@link RecordWriter} to write the output for the job. + * @throws IOException */ RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) - throws IOException; + throws IOException; - /** Check whether the output specification for a job is appropriate. Called - * when a job is submitted. Typically checks that it does not already exist, + /** + * Check for validity of the output-specification for the job. + * + *

This is to validate the output specification for the job when it is + * a job is submitted. Typically checks that it does not already exist, * throwing an exception when it already exists, so that output is not - * overwritten. + * overwritten.

* - * @param job the job whose output will be written + * @param ignored + * @param job job configuration. * @throws IOException when output should not be attempted */ void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java Wed Oct 24 14:18:51 2007 @@ -21,18 +21,32 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -/** Partitions the key space. A partition is created for each reduce task. */ +/** + * Partitions the key space. + * + *

Partitioner controls the partitioning of the keys of the + * intermediate map-outputs. The key (or a subset of the key) is used to derive + * the partition, typically by a hash function. The total number of partitions + * is the same as the number of reduce tasks for the job. Hence this controls + * which of the m reduce tasks the intermediate key (and hence the + * record) is sent for reduction.

+ * + * @see Reducer + */ public interface Partitioner extends JobConfigurable { - /** Returns the paritition number for a given entry given the total number of - * partitions. Typically a hash function on a all or a subset of the key. + /** + * Get the paritition number for a given key (hence record) given the total + * number of partitions i.e. number of reduce-tasks for the job. + * + *

Typically a hash function on a all or a subset of the key.

* - * @param key the entry key - * @param value the entry value - * @param numPartitions the number of partitions - * @return the partition number + * @param key the key to be paritioned. + * @param value the entry value. + * @param numPartitions the total number of partitions. + * @return the partition number for the key. */ int getPartition(K2 key, V2 value, int numPartitions); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java Wed Oct 24 14:18:51 2007 @@ -24,11 +24,23 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -/** Reads key/value pairs from an input file {@link FileSplit}. - * Implemented by {@link InputFormat} implementations. */ +/** + * RecordReader reads <key, value> pairs from an + * {@link InputSplit}. + * + *

RecordReader, typically, converts the byte-oriented view of + * the input, provided by the InputSplit, and presents a + * record-oriented view for the {@link Mapper} & {@link Reducer} tasks for + * processing. It thus assumes the responsibility of processing record + * boundaries and presenting the tasks with keys and values.

+ * + * @see InputSplit + * @see InputFormat + */ public interface RecordReader { - /** Reads the next key/value pair. + /** + * Reads the next key/value pair from the input for processing. * * @param key the key to read data into * @param value the value to read data into @@ -40,25 +52,38 @@ /** * Create an object of the appropriate type to be used as a key. - * @return a new key object + * + * @return a new key object. */ K createKey(); /** - * Create an object of the appropriate type to be used as the value. - * @return a new value object + * Create an object of the appropriate type to be used as a value. + * + * @return a new value object. */ V createValue(); - /** Returns the current position in the input. */ + /** + * Returns the current position in the input. + * + * @return the current position in the input. + * @throws IOException + */ long getPos() throws IOException; - /** Close this to future operations.*/ + /** + * Close this {@link InputSplit} to future operations. + * + * @throws IOException + */ public void close() throws IOException; /** - * How far has the reader gone through the input. - * @return progress from 0.0 to 1.0 + * How much of the input has the {@link RecordReader} consumed i.e. + * has been processed by? + * + * @return progress from 0.0 to 1.0. * @throws IOException */ float getProgress() throws IOException; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java Wed Oct 24 14:18:51 2007 @@ -21,22 +21,36 @@ import java.io.IOException; import java.io.DataOutput; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; -/** Writes key/value pairs to an output file. Implemented by {@link - * OutputFormat} implementations. */ +/** + * RecordWriter writes the output <key, value> pairs + * to an output file. + + *

RecordWriter implementations write the job outputs to the + * {@link FileSystem}. + * + * @see OutputFormat + */ public interface RecordWriter { - /** Writes a key/value pair. - * - * @param key the key to write - * @param value the value to write + /** + * Writes a key/value pair. * + * @param key the key to write. + * @param value the value to write. + * @throws IOException * @see Writable#write(DataOutput) */ void write(K key, V value) throws IOException; - /** Close this to future operations.*/ + /** + * Close this RecordWriter to future operations. + * + * @param reporter facility to report progress. + * @throws IOException + */ void close(Reporter reporter) throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java Wed Oct 24 14:18:51 2007 @@ -22,24 +22,176 @@ import java.util.Iterator; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Closeable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -/** Reduces a set of intermediate values which share a key to a smaller set of - * values. Input values are the grouped output of a {@link Mapper}. */ +/** + * Reduces a set of intermediate values which share a key to a smaller set of + * values. + * + *

The number of Reducers for the job is set by the user via + * {@link JobConf#setNumReduceTasks(int)}. Reducer implementations + * can access the {@link JobConf} for the job via the + * {@link JobConfigurable#configure(JobConf)} method and initialize themselves. + * Similarly they can use the {@link Closeable#close()} method for + * de-initialization.

+ + *

Reducer has 3 primary phases:

+ *
    + *
  1. + * + *

    Shuffle

    + * + *

    Reducer is input the grouped output of a {@link Mapper}. + * In the phase the framework, for each Reducer, fetches the + * relevant partition of the output of all the Mappers, via HTTP. + *

    + *
  2. + * + *
  3. + *

    Sort

    + * + *

    The framework groups Reducer inputs by keys + * (since different Mappers may have output the same key) in this + * stage.

    + * + *

    The shuffle and sort phases occur simultaneously i.e. while outputs are + * being fetched they are merged.

    + * + *
    SecondarySort
    + * + *

    If equivalence rules for keys while grouping the intermediates are + * different from those for grouping keys before reduction, then one may + * specify a Comparator via + * {@link JobConf#setOutputValueGroupingComparator(Class)}.Since + * {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to + * control how intermediate keys are grouped, these can be used in conjunction + * to simulate secondary sort on values.

    + * + * + * For example, say that you want to find duplicate web pages and tag them + * all with the url of the "best" known example. You would set up the job + * like: + *
      + *
    • Map Input Key: url
    • + *
    • Map Input Value: document
    • + *
    • Map Output Key: document checksum, url pagerank
    • + *
    • Map Output Value: url
    • + *
    • Partitioner: by checksum
    • + *
    • OutputKeyComparator: by checksum and then decreasing pagerank
    • + *
    • OutputValueGroupingComparator: by checksum
    • + *
    + *
  4. + * + *
  5. + *

    Reduce

    + * + *

    In this phase the + * {@link #reduce(WritableComparable, Iterator, OutputCollector, Reporter)} + * method is called for each <key, (list of values)> pair in + * the grouped inputs.

    + *

    The output of the reduce task is typically written to the + * {@link FileSystem} via + * {@link OutputCollector#collect(WritableComparable, Writable)}.

    + *
  6. + *
+ * + *

The output of the Reducer is not re-sorted.

+ * + *

Example:

+ *

+ *     public class MyReducer<K extends WritableComparable, V extends Writable> 
+ *     extends MapReduceBase implements Reducer<K, V, K, V> {
+ *     
+ *       static enum MyCounters { NUM_RECORDS }
+ *        
+ *       private String reduceTaskId;
+ *       private int noKeys = 0;
+ *       
+ *       public void configure(JobConf job) {
+ *         reduceTaskId = job.get("mapred.task.id");
+ *       }
+ *       
+ *       public void reduce(K key, Iterator<V> values,
+ *                          OutputCollector<K, V> output, 
+ *                          Reporter reporter)
+ *       throws IOException {
+ *       
+ *         // Process
+ *         int noValues = 0;
+ *         while (values.hasNext()) {
+ *           V value = values.next();
+ *           
+ *           // Increment the no. of values for this key
+ *           ++noValues;
+ *           
+ *           // Process the <key, value> pair (assume this takes a while)
+ *           // ...
+ *           // ...
+ *           
+ *           // Let the framework know that we are alive, and kicking!
+ *           if ((noValues%10) == 0) {
+ *             reporter.progress();
+ *           }
+ *         
+ *           // Process some more
+ *           // ...
+ *           // ...
+ *           
+ *           // Output the <key, value> 
+ *           output.collect(key, value);
+ *         }
+ *         
+ *         // Increment the no. of <key, list of values> pairs processed
+ *         ++noKeys;
+ *         
+ *         // Increment counters
+ *         reporter.incrCounter(NUM_RECORDS, 1);
+ *         
+ *         // Every 100 keys update application-level status
+ *         if ((noKeys%100) == 0) {
+ *           reporter.setStatus(reduceTaskId + " processed " + noKeys);
+ *         }
+ *       }
+ *     }
+ * 

+ * + * @see Mapper + * @see Partitioner + * @see Reporter + * @see MapReduceBase + */ public interface Reducer extends JobConfigurable, Closeable { - /** Combines values for a given key. Output values must be of the same type - * as input values. Input keys must not be altered. Typically all values - * are combined into zero or one value. Output pairs are collected with - * calls to {@link OutputCollector#collect(WritableComparable,Writable)}. + /** + * Reduces values for a given key. + * + *

The framework calls this method for each + * <key, (list of values)> pair in the grouped inputs. + * Output values must be of the same type as input values. Input keys must + * not be altered. Typically all values are combined into zero or one value. + *

+ * + *

Output pairs are collected with calls to + * {@link OutputCollector#collect(WritableComparable,Writable)}.

* - * @param key the key - * @param values the values to combine - * @param output to collect combined values + *

Applications can use the {@link Reporter} provided to report progress + * or just indicate that they are alive. In scenarios where the application + * takes an insignificant amount of time to process individual key/value + * pairs, this is crucial since the framework might assume that the task has + * timed-out and kill that task. The other way of avoiding this is to set + * + * mapred.task.timeout to a high-enough value (or even zero for no + * time-outs).

+ * + * @param key the key. + * @param values the list of values to reduce. + * @param output to collect keys and combined values. + * @param reporter facility to report progress. */ void reduce(K2 key, Iterator values, OutputCollector output, Reporter reporter) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Wed Oct 24 14:18:51 2007 @@ -18,11 +18,24 @@ package org.apache.hadoop.mapred; -import java.io.IOException; - import org.apache.hadoop.util.Progressable; -/** Passed to application code to permit alteration of status. */ +/** + * A facility for Map-Reduce applications to report progress and update + * counters, status information etc. + * + *

{@link Mapper} and {@link Reducer} can use the Reporter + * provided to report progress or just indicate that they are alive. In + * scenarios where the application takes an insignificant amount of time to + * process individual key/value pairs, this is crucial since the framework + * might assume that the task has timed-out and kill that task. + * + *

Applications can also update {@link Counters} via the provided + * Reporter .

+ * + * @see Progressable + * @see Counters + */ public interface Reporter extends Progressable { /** @@ -41,25 +54,27 @@ }; /** - * Alter the application's status description. + * Set the status description for the task. * - * @param status - * a brief description of the current status + * @param status brief description of the current status. */ public abstract void setStatus(String status); /** * Increments the counter identified by the key, which can be of - * any enum type, by the specified amount. - * @param key A value of any enum type + * any {@link Enum} type, by the specified amount. + * + * @param key key to identify the counter to be incremented. The key can be + * be any Enum. * @param amount A non-negative amount by which the counter is to - * be incremented + * be incremented. */ public abstract void incrCounter(Enum key, long amount); /** - * Get the InputSplit object for a map. - * @return the input split that the map is reading from + * Get the {@link InputSplit} object for a map. + * + * @return the InputSplit that the map is reading from. * @throws UnsupportedOperationException if called outside a mapper */ public abstract InputSplit getInputSplit() Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Wed Oct 24 14:18:51 2007 @@ -21,78 +21,120 @@ import java.io.*; /** - * Includes details on a running MapReduce job. A client can - * track a living job using this object. + * RunningJob is the user-interface to query for details on a + * running Map-Reduce job. + * + *

Clients can get hold of RunningJob via the {@link JobClient} + * and then query the running-job for details such as name, configuration, + * progress etc.

+ * + * @see JobClient */ public interface RunningJob { /** - * Returns an identifier for the job + * Get the job identifier. + * + * @return the job identifier. */ public String getJobID(); /** - * Returns the name of the job + * Get the name of the job. + * + * @return the name of the job. */ public String getJobName(); /** - * Returns the path of the submitted job. + * Get the path of the submitted job configuration. + * + * @return the path of the submitted job configuration. */ public String getJobFile(); /** - * Returns a URL where some job progress information will be displayed. + * Get the URL where some job progress information will be displayed. + * + * @return the URL where some job progress information will be displayed. */ public String getTrackingURL(); /** - * Returns a float between 0.0 and 1.0, indicating progress on - * the map portion of the job. When all map tasks have completed, - * the function returns 1.0. + * Get the progress of the job's map-tasks, as a float between 0.0 + * and 1.0. When all map tasks have completed, the function returns 1.0. + * + * @return the progress of the job's map-tasks. + * @throws IOException */ public float mapProgress() throws IOException; /** - * Returns a float between 0.0 and 1.0, indicating progress on - * the reduce portion of the job. When all reduce tasks have completed, - * the function returns 1.0. + * Get the progress of the job's reduce-tasks, as a float between 0.0 + * and 1.0. When all reduce tasks have completed, the function returns 1.0. + * + * @return the progress of the job's reduce-tasks. + * @throws IOException */ public float reduceProgress() throws IOException; /** - * Non-blocking function to check whether the job is finished or not. + * Check if the job is finished or not. + * This is a non-blocking call. + * + * @return true if the job is complete, else false. + * @throws IOException */ public boolean isComplete() throws IOException; /** - * True iff job completed successfully. + * Check if the job completed successfully. + * + * @return true if the job succeeded, else false. + * @throws IOException */ public boolean isSuccessful() throws IOException; /** * Blocks until the job is complete. + * + * @throws IOException */ public void waitForCompletion() throws IOException; /** * Kill the running job. Blocks until all job tasks have been * killed as well. If the job is no longer running, it simply returns. + * + * @throws IOException */ public void killJob() throws IOException; - public TaskCompletionEvent[] getTaskCompletionEvents( - int startFrom) throws IOException; + /** + * Get events indicating completion (success/failure) of component tasks. + * + * @param startFrom index to start fetching events from + * @return an array of {@link TaskCompletionEvent}s + * @throws IOException + */ + public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) + throws IOException; /** * Kill indicated task attempt. - * @param taskId the id of the task to kill. - * @param shouldFail if true the task is failed and added to failed tasks list, otherwise - * it is just killed, w/o affecting job failure status. + * + * @param taskId the id of the task to be terminated. + * @param shouldFail if true the task is failed and added to failed tasks + * list, otherwise it is just killed, w/o affecting + * job failure status. + * @throws IOException */ public void killTask(String taskId, boolean shouldFail) throws IOException; /** * Gets the counters for this job. + * + * @return the counters for this job. + * @throws IOException */ public Counters getCounters() throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html Wed Oct 24 14:18:51 2007 @@ -1,16 +1,212 @@ -

A system for scalable, fault-tolerant, distributed computation over -large data collections.

+

A software framework for easily writing applications which process vast +amounts of data (multi-terabyte data-sets) parallelly on large clusters +(thousands of nodes) built of commodity hardware in a reliable, fault-tolerant +manner.

-

Applications implement {@link org.apache.hadoop.mapred.Mapper} and -{@link org.apache.hadoop.mapred.Reducer} interfaces. These are submitted -as a {@link org.apache.hadoop.mapred.JobConf} and are applied to data -stored in a {@link org.apache.hadoop.fs.FileSystem}.

+

A Map-Reduce job usually splits the input data-set into independent +chunks which processed by map tasks in completely parallel manner, +followed by reduce tasks which aggregating their output. Typically both +the input and the output of the job are stored in a +{@link org.apache.hadoop.fs.FileSystem}. The framework takes care of monitoring +tasks and re-executing failed ones. Since, usually, the compute nodes and the +storage nodes are the same i.e. Hadoop's Map-Reduce framework and Distributed +FileSystem are running on the same set of nodes, tasks are effectively scheduled +on the nodes where data is already present, resulting in very high aggregate +bandwidth across the cluster.

-

See Google's -original Map/Reduce paper for background information.

+

The Map-Reduce framework operates exclusively on <key, value> +pairs i.e. the input to the job is viewed as a set of <key, value> +pairs and the output as another, possibly different, set of +<key, value> pairs. The keys and values have to +be serializable as {@link org.apache.hadoop.io.Writable}s and additionally the +keys have to be {@link org.apache.hadoop.io.WritableComparable}s in +order to facilitate grouping by the framework.

+ +

Data flow:

+
+                                (input)
+                                <k1, v1>
+       
+                                   |
+                                   V
+       
+                                  map
+       
+                                   |
+                                   V
+
+                                <k2, v2>
+       
+                                   |
+                                   V
+       
+                                combine
+       
+                                   |
+                                   V
+       
+                                <k2, v2>
+       
+                                   |
+                                   V
+       
+                                 reduce
+       
+                                   |
+                                   V
+       
+                                <k3, v3>
+                                (output)
+
+ +

Applications typically implement +{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)} +and +{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)} +methods. The application-writer also specifies various facets of the job such +as input and output locations, the Partitioner, InputFormat +& OutputFormat implementations to be used etc. as +a {@link org.apache.hadoop.mapred.JobConf}. The client program, +{@link org.apache.hadoop.mapred.JobClient}, then submits the job to the framework +and optionally monitors it.

+ +

The framework spawns one map task per +{@link org.apache.hadoop.mapred.InputSplit} generated by the +{@link org.apache.hadoop.mapred.InputFormat} of the job and calls +{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)} +with each <key, value> pair read by the +{@link org.apache.hadoop.mapred.RecordReader} from the InputSplit for +the task. The intermediate outputs of the maps are then grouped by keys +and optionally aggregated by combiner. The key space of intermediate +outputs are paritioned by the {@link org.apache.hadoop.mapred.Partitioner}, where +the number of partitions is exactly the number of reduce tasks for the job.

+ +

The reduce tasks fetch the sorted intermediate outputs of the maps, via http, +merge the <key, value> pairs and call +{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)} +for each <key, list of values> pair. The output of the reduce tasks' is +stored on the FileSystem by the +{@link org.apache.hadoop.mapred.RecordWriter} provided by the +{@link org.apache.hadoop.mapred.OutputFormat} of the job.

+ +

Map-Reduce application to perform a distributed grep:

+

+public class Grep extends Configured implements Tool {
+
+  // map: Search for the pattern specified by 'grep.mapper.regex' &
+  //      'grep.mapper.regex.group'
+
+  class GrepMapper<K extends WritableComparable, Text> 
+  extends MapReduceBase  implements Mapper<K, Text, Text, LongWritable> {
+
+    private Pattern pattern;
+    private int group;
+
+    public void configure(JobConf job) {
+      pattern = Pattern.compile(job.get("grep.mapper.regex"));
+      group = job.getInt("grep.mapper.regex.group", 0);
+    }
+
+    public void map(K key, Text value,
+                    OutputCollector<Text, LongWritable> output,
+                    Reporter reporter)
+    throws IOException {
+      String text = value.toString();
+      Matcher matcher = pattern.matcher(text);
+      while (matcher.find()) {
+        output.collect(new Text(matcher.group(group)), new LongWritable(1));
+      }
+    }
+  }
+
+  // reduce: Count the number of occurrences of the pattern
+
+  class GrepReducer<K extends WritableComparable> extends MapReduceBase
+  implements Reducer<K, LongWritable, K, LongWritable> {
+
+    public void reduce(K key, Iterator<LongWritable> values,
+                       OutputCollector<K, LongWritable> output,
+                       Reporter reporter)
+    throws IOException {
+
+      // sum all values for this key
+      long sum = 0;
+      while (values.hasNext()) {
+        sum += values.next().get();
+      }
+
+      // output sum
+      output.collect(key, new LongWritable(sum));
+    }
+  }
+  
+  public int run(String[] args) throws Exception {
+    if (args.length < 3) {
+      System.out.println("Grep <inDir> <outDir> <regex> [<group>]");
+      ToolRunner.printGenericCommandUsage(System.out);
+      return -1;
+    }
+
+    JobConf grepJob = new JobConf(getConf(), Grep.class);
+    
+    grepJob.setJobName("grep");
+
+    grepJob.setInputPath(new Path(args[0]));
+    grepJob.setOutputPath(args[1]);
+
+    grepJob.setMapperClass(GrepMapper.class);
+    grepJob.setCombinerClass(GrepReducer.class);
+    grepJob.setReducerClass(GrepReducer.class);
+
+    grepJob.set("mapred.mapper.regex", args[2]);
+    if (args.length == 4)
+      grepJob.set("mapred.mapper.regex.group", args[3]);
+
+    grepJob.setOutputFormat(SequenceFileOutputFormat.class);
+    grepJob.setOutputKeyClass(Text.class);
+    grepJob.setOutputValueClass(LongWritable.class);
+
+    JobClient.runJob(grepJob);
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new Grep(), args);
+    System.exit(res);
+  }
+
+}
+
+ +

Notice how the data-flow of the above grep job is very similar to doing the +same via the unix pipeline:

+ +
+cat input/*   |   grep   |   sort    |   uniq -c   >   out
+
+ +
+      input   |    map   |  shuffle  |   reduce    >   out
+
+ +

Hadoop Map-Reduce applications need not be written in +JavaTM only. +Hadoop Streaming is a utility +which allows users to create and run jobs with any executables (e.g. shell +utilities) as the mapper and/or the reducer. +Hadoop Pipes is a +SWIG-compatible C++ API to implement +Map-Reduce applications (non JNITM based).

+ +

See Google's original +Map/Reduce paper for background information.

+ +

Java and JNI are trademarks or registered trademarks of +Sun Microsystems, Inc. in the United States and other countries.