Return-Path:
JobConf
is the primary interface for a user to describe a
+ * map-reduce job to the Hadoop framework for execution. The framework tries to
+ * faithfully execute the job as-is described by JobConf
, however:
+ *
+ *
JobConf
typically specifies the {@link Mapper}, combiner
+ * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
+ * {@link OutputFormat} implementations to be used etc. It also indicates the
+ * set of input files ({@link #setInputPath(Path)}/{@link #addInputPath(Path)}),
+ * and where the output files should be written ({@link #setOutputPath(Path)}).
+ *
+ *
Optionally JobConf
is used to specify other advanced facets
+ * of the job such as Comparator
s to be used, files to be put in
+ * the {@link DistributedCache}, whether or not intermediate and/or job outputs
+ * are to be compressed (and how) etc.
Here is an example on how to configure a job via JobConf
:
+ * + * @see JobClient + * @see ClusterStatus + * @see Tool + * @see DistributedCache + */ public class JobConf extends Configuration { private static final Log LOG = LogFactory.getLog(JobConf.class); @@ -61,6 +115,7 @@ /** * Construct a map/reduce job configuration. + * * @param exampleClass a class whose containing jar is used as the job's jar. */ public JobConf(Class exampleClass) { @@ -92,7 +147,7 @@ /** Construct a map/reduce configuration. * - * @param config a Configuration-format XML job description file + * @param config a Configuration-format XML job description file. */ public JobConf(String config) { this(new Path(config)); @@ -100,7 +155,7 @@ /** Construct a map/reduce configuration. * - * @param config a Configuration-format XML job description file + * @param config a Configuration-format XML job description file. */ public JobConf(Path config) { super(); @@ -111,6 +166,7 @@ /** * Checks if mapred-default.xml is on the CLASSPATH, if so * it warns the user and loads it as a {@link Configuration} resource. + * * @deprecated Remove in hadoop-0.16.0 via HADOOP-1843 */ private void checkWarnAndLoadMapredDefault() { @@ -122,12 +178,24 @@ } } + /** + * Get the user jar for the map-reduce job. + * + * @return the user jar for the map-reduce job. + */ public String getJar() { return get("mapred.jar"); } + + /** + * Set the user jar for the map-reduce job. + * + * @param jar the user jar for the map-reduce job. + */ public void setJar(String jar) { set("mapred.jar", jar); } /** * Set the job's jar file by finding an example class location. - * @param cls the example class + * + * @param cls the example class. */ public void setJarByClass(Class cls) { String jar = findContainingJar(cls); @@ -136,6 +204,11 @@ } } + /** + * Get the system directory where job-specific files are to be placed. + * + * @return the system directory where job-specific files are to be placed. + */ public Path getSystemDir() { return new Path(get("mapred.system.dir", "/tmp/hadoop/mapred/system")); } @@ -158,23 +231,41 @@ } } - /** Constructs a local file name. Files are distributed among configured - * local directories.*/ + /** + * Constructs a local file name. Files are distributed among configured + * local directories. + */ public Path getLocalPath(String pathString) throws IOException { return getLocalPath("mapred.local.dir", pathString); } + /** + * Set the {@link Path} of the input directory for the map-reduce job. + * + * @param dir the {@link Path} of the input directory for the map-reduce job. + */ public void setInputPath(Path dir) { dir = new Path(getWorkingDirectory(), dir); set("mapred.input.dir", dir.toString()); } + /** + * Add a {@link Path} to the list of inputs for the map-reduce job. + * + * @param dir {@link Path} to be added to the list of inputs for + * the map-reduce job. + */ public void addInputPath(Path dir) { dir = new Path(getWorkingDirectory(), dir); String dirs = get("mapred.input.dir"); set("mapred.input.dir", dirs == null ? dir.toString() : dirs + "," + dir); } + /** + * Get the list of input {@link Path}s for the map-reduce job. + * + * @return the list of input {@link Path}s for the map-reduce job. + */ public Path[] getInputPaths() { String dirs = get("mapred.input.dir", ""); ArrayList list = Collections.list(new StringTokenizer(dirs, ",")); @@ -187,6 +278,7 @@ /** * Get the reported username for this job. + * * @return the username */ public String getUser() { @@ -195,7 +287,8 @@ /** * Set the reported username for this job. - * @param user the username + * + * @param user the username for this job. */ public void setUser(String user) { set("user.name", user); @@ -206,6 +299,10 @@ /** * Set whether the framework should keep the intermediate files for * failed tasks. + * + * @param keep+ * // Create a new JobConf + * JobConf job = new JobConf(new Configuration(), MyJob.class); + * + * // Specify various job-specific parameters + * job.setJobName("myjob"); + * + * job.setInputPath(new Path("in")); + * job.setOutputPath(new Path("out")); + * + * job.setMapperClass(MyJob.MyMapper.class); + * job.setCombinerClass(MyJob.MyReducer.class); + * job.setReducerClass(MyJob.MyReducer.class); + * + * job.setInputFormat(SequenceFileInputFormat.class); + * job.setOutputFormat(SequenceFileOutputFormat.class); + *
true
if framework should keep the intermediate files
+ * for failed tasks, false
otherwise.
+ *
*/
public void setKeepFailedTaskFiles(boolean keep) {
setBoolean("keep.failed.task.files", keep);
@@ -213,6 +310,7 @@
/**
* Should the temporary files for failed tasks be kept?
+ *
* @return should the files be kept?
*/
public boolean getKeepFailedTaskFiles() {
@@ -223,6 +321,7 @@
* Set a regular expression for task names that should be kept.
* The regular expression ".*_m_000123_0" would keep the files
* for the first instance of map 123 that ran.
+ *
* @param pattern the java.util.regex.Pattern to match against the
* task names.
*/
@@ -233,15 +332,17 @@
/**
* Get the regular expression that is matched against the task names
* to see if we need to keep the files.
- * @return the pattern as a string, if it was set, othewise null
+ *
+ * @return the pattern as a string, if it was set, othewise null.
*/
public String getKeepTaskFilesPattern() {
return get("keep.task.files.pattern");
}
/**
- * Set the current working directory for the default file system
- * @param dir the new current working directory
+ * Set the current working directory for the default file system.
+ *
+ * @param dir the new current working directory.
*/
public void setWorkingDirectory(Path dir) {
dir = new Path(getWorkingDirectory(), dir);
@@ -250,7 +351,8 @@
/**
* Get the current working directory for the default file system.
- * @return the directory name
+ *
+ * @return the directory name.
*/
public Path getWorkingDirectory() {
String name = get("mapred.working.dir");
@@ -267,31 +369,107 @@
}
}
+ /**
+ * Get the {@link Path} to the output directory for the map-reduce job.
+ *
+ * Some applications need to create/write-to side-files, which differ from + * the actual job-outputs. + * + *
In such cases there could be issues with 2 instances of the same TIP + * (running simultaneously e.g. speculative tasks) trying to open/write-to the + * same file (path) on HDFS. Hence the application-writer will have to pick + * unique names per task-attempt (e.g. using the taskid, say + * task_200709221812_0001_m_000000_0), not just per TIP.
+ * + *To get around this the Map-Reduce framework helps the application-writer + * out by maintaining a special ${mapred.output.dir}/_${taskid} + * sub-directory for each task-attempt on HDFS where the output of the + * task-attempt goes. On successful completion of the task-attempt the files + * in the ${mapred.output.dir}/_${taskid} (only) + * are promoted to ${mapred.output.dir}. Of course, the + * framework discards the sub-directory of unsuccessful task-attempts. This + * is completely transparent to the application.
+ * + *The application-writer can take advantage of this by creating any + * side-files required in ${mapred.output.dir} during execution of his + * reduce-task i.e. via {@link #getOutputPath()}, and the framework will move + * them out similarly - thus she doesn't have to pick unique paths per + * task-attempt.
+ * + *Note: the value of ${mapred.output.dir} during execution + * of a particular task-attempt is actually + * ${mapred.output.dir}/_{$taskid}, not the value set by + * {@link #setOutputPath(Path)}. So, just create any side-files in the path + * returned by {@link #getOutputPath()} from map/reduce task to take + * advantage of this feature.
+ * + *The entire discussion holds true for maps of jobs with + * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, + * goes directly to HDFS.
+ * + * @return the {@link Path} to the output directory for the map-reduce job. + */ public Path getOutputPath() { String name = get("mapred.output.dir"); return name == null ? null: new Path(name); } + /** + * Set the {@link Path} of the output directory for the map-reduce job. + * + *Note: + *
+ * @param dir the {@link Path} of the output directory for the map-reduce job. + */ public void setOutputPath(Path dir) { dir = new Path(getWorkingDirectory(), dir); set("mapred.output.dir", dir.toString()); } + /** + * Get the {@link InputFormat} implementation for the map-reduce job, + * defaults to {@link TextInputFormat} if not specified explicity. + * + * @return the {@link InputFormat} implementation for the map-reduce job. + */ public InputFormat getInputFormat() { return (InputFormat)ReflectionUtils.newInstance(getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), this); } + + /** + * Set the {@link InputFormat} implementation for the map-reduce job. + * + * @param theClass the {@link InputFormat} implementation for the map-reduce + * job. + */ public void setInputFormat(Class extends InputFormat> theClass) { setClass("mapred.input.format.class", theClass, InputFormat.class); } + + /** + * Get the {@link OutputFormat} implementation for the map-reduce job, + * defaults to {@link TextOutputFormat} if not specified explicity. + * + * @return the {@link OutputFormat} implementation for the map-reduce job. + */ public OutputFormat getOutputFormat() { return (OutputFormat)ReflectionUtils.newInstance(getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), this); } + + /** + * Set the {@link OutputFormat} implementation for the map-reduce job. + * + * @param theClass the {@link OutputFormat} implementation for the map-reduce + * job. + */ public void setOutputFormat(Class extends OutputFormat> theClass) { setClass("mapred.output.format.class", theClass, OutputFormat.class); } @@ -320,6 +498,7 @@ /** * Should the map outputs be compressed before transfer? * Uses the SequenceFile compression. + * * @param compress should the map outputs be compressed? */ public void setCompressMapOutput(boolean compress) { @@ -328,8 +507,9 @@ /** * Are the outputs of the maps be compressed? + * * @returntrue
if the outputs of the maps are to be compressed,
- * false
otherwise
+ * false
otherwise.
*/
public boolean getCompressMapOutput() {
return getBoolean("mapred.compress.map.output", false);
@@ -337,8 +517,9 @@
/**
* Set the {@link CompressionType} for the map outputs.
+ *
* @param style the {@link CompressionType} to control how the map outputs
- * are compressed
+ * are compressed.
*/
public void setMapOutputCompressionType(CompressionType style) {
set("mapred.map.output.compression.type", style.toString());
@@ -346,8 +527,9 @@
/**
* Get the {@link CompressionType} for the map outputs.
+ *
* @return the {@link CompressionType} for map outputs, defaulting to
- * {@link CompressionType#RECORD}
+ * {@link CompressionType#RECORD}.
*/
public CompressionType getMapOutputCompressionType() {
String val = get("mapred.map.output.compression.type",
@@ -357,8 +539,9 @@
/**
* Set the given class as the {@link CompressionCodec} for the map outputs.
+ *
* @param codecClass the {@link CompressionCodec} class that will compress
- * the map outputs
+ * the map outputs.
*/
public void
setMapOutputCompressorClass(Class extends CompressionCodec> codecClass) {
@@ -368,9 +551,10 @@
/**
* Get the {@link CompressionCodec} for compressing the map outputs.
+ *
* @param defaultValue the {@link CompressionCodec} to return if not set
* @return the {@link CompressionCodec} class that should be used to compress the
- * map outputs
+ * map outputs.
* @throws IllegalArgumentException if the class was specified, but not found
*/
public Class extends CompressionCodec>
@@ -390,10 +574,10 @@
/**
* Get the key class for the map output data. If it is not set, use the
- * (final) output ket class This allows the map output key class to be
- * different than the final output key class
- *
- * @return map output key class
+ * (final) output key class. This allows the map output key class to be
+ * different than the final output key class.
+ *
+ * @return the map output key class.
*/
public Class extends WritableComparable> getMapOutputKeyClass() {
Class extends WritableComparable> retv = getClass("mapred.mapoutput.key.class", null,
@@ -407,7 +591,9 @@
/**
* Set the key class for the map output data. This allows the user to
* specify the map output key class to be different than the final output
- * value class
+ * value class.
+ *
+ * @param theClass the map output key class.
*/
public void setMapOutputKeyClass(Class extends WritableComparable> theClass) {
setClass("mapred.mapoutput.key.class", theClass,
@@ -417,9 +603,9 @@
/**
* Get the value class for the map output data. If it is not set, use the
* (final) output value class This allows the map output value class to be
- * different than the final output value class
- *
- * @return map output value class
+ * different than the final output value class.
+ *
+ * @return the map output value class.
*/
public Class extends Writable> getMapOutputValueClass() {
Class extends Writable> retv = getClass("mapred.mapoutput.value.class", null,
@@ -433,21 +619,38 @@
/**
* Set the value class for the map output data. This allows the user to
* specify the map output value class to be different than the final output
- * value class
+ * value class.
+ *
+ * @param theClass the map output value class.
*/
public void setMapOutputValueClass(Class extends Writable> theClass) {
setClass("mapred.mapoutput.value.class", theClass, Writable.class);
}
+ /**
+ * Get the key class for the job output data.
+ *
+ * @return the key class for the job output data.
+ */
public Class extends WritableComparable> getOutputKeyClass() {
return getClass("mapred.output.key.class",
LongWritable.class, WritableComparable.class);
}
+ /**
+ * Set the key class for the job output data.
+ *
+ * @param theClass the key class for the job output data.
+ */
public void setOutputKeyClass(Class extends WritableComparable> theClass) {
setClass("mapred.output.key.class", theClass, WritableComparable.class);
}
+ /**
+ * Get the {@link WritableComparable} comparator used to compare keys.
+ *
+ * @return the {@link WritableComparable} comparator used to compare keys.
+ */
public WritableComparator getOutputKeyComparator() {
Class theClass = getClass("mapred.output.key.comparator.class", null,
WritableComparator.class);
@@ -456,17 +659,24 @@
return WritableComparator.get(getMapOutputKeyClass());
}
+ /**
+ * Set the {@link WritableComparable} comparator used to compare keys.
+ *
+ * @param theClass the {@link WritableComparable} comparator used to
+ * compare keys.
+ * @see #setOutputValueGroupingComparator(Class)
+ */
public void setOutputKeyComparatorClass(Class extends WritableComparator> theClass) {
setClass("mapred.output.key.comparator.class",
theClass, WritableComparator.class);
}
- /** Get the user defined comparator for grouping values.
+ /**
+ * Get the user defined {@link WritableComparable} comparator for
+ * grouping keys of inputs to the reduce.
*
- * This call is used to get the comparator for grouping values by key.
- * @see #setOutputValueGroupingComparator(Class) for details.
- *
- * @return Comparator set by the user for grouping values.
+ * @return comparator set by the user for grouping values.
+ * @see #setOutputValueGroupingComparator(Class) for details.
*/
public WritableComparator getOutputValueGroupingComparator() {
Class theClass = getClass("mapred.output.value.groupfn.class", null,
@@ -478,120 +688,310 @@
return (WritableComparator)ReflectionUtils.newInstance(theClass, this);
}
- /** Set the user defined comparator for grouping values.
+ /**
+ * Set the user defined {@link WritableComparable} comparator for
+ * grouping keys in the input to the reduce.
*
- * For key-value pairs (K1,V1) and (K2,V2), the values are passed
- * in a single call to the map function if K1 and K2 compare as equal.
+ * This comparator should be provided if the equivalence rules for keys + * for sorting the intermediates are different from those for grouping keys + * before each call to + * {@link Reducer#reduce(WritableComparable, java.util.Iterator, OutputCollector, Reporter)}.
+ * + *For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed + * in a single call to the reduce function if K1 and K2 compare as equal.
* - * This comparator should be provided if the equivalence rules for keys - * for sorting the intermediates are different from those for grouping - * values. + *Since {@link #setOutputKeyComparatorClass(Class)} can be used to control + * how keys are sorted, this can be used in conjunction to simulate + * secondary sort on values.
+ * + *Note: This is not a guarantee of the reduce sort being + * stable in any sense. (In any case, with the order of available + * map-outputs to the reduce being non-deterministic, it wouldn't make + * that much sense.)
* - * @param theClass The Comparator class to be used for grouping. It should - * extend WritableComparator. + * @param theClass the comparator class to be used for grouping keys. + * It should extendWritableComparator
.
+ * @see #setOutputKeyComparatorClass(Class)
*/
public void setOutputValueGroupingComparator(Class theClass) {
setClass("mapred.output.value.groupfn.class",
theClass, WritableComparator.class);
}
+ /**
+ * Get the value class for job outputs.
+ *
+ * @return the value class for job outputs.
+ */
public Class extends Writable> getOutputValueClass() {
return getClass("mapred.output.value.class", Text.class, Writable.class);
}
+
+ /**
+ * Set the value class for job outputs.
+ *
+ * @param theClass the value class for job outputs.
+ */
public void setOutputValueClass(Class extends Writable> theClass) {
setClass("mapred.output.value.class", theClass, Writable.class);
}
-
+ /**
+ * Get the {@link Mapper} class for the job.
+ *
+ * @return the {@link Mapper} class for the job.
+ */
public Class extends Mapper> getMapperClass() {
return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
}
+
+ /**
+ * Set the {@link Mapper} class for the job.
+ *
+ * @param theClass the {@link Mapper} class for the job.
+ */
public void setMapperClass(Class extends Mapper> theClass) {
setClass("mapred.mapper.class", theClass, Mapper.class);
}
+ /**
+ * Get the {@link MapRunnable} class for the job.
+ *
+ * @return the {@link MapRunnable} class for the job.
+ */
public Class extends MapRunnable> getMapRunnerClass() {
return getClass("mapred.map.runner.class",
MapRunner.class, MapRunnable.class);
}
+
+ /**
+ * Expert: Set the {@link MapRunnable} class for the job.
+ *
+ * Typically used to exert greater control on {@link Mapper}s.
+ *
+ * @param theClass the {@link MapRunnable} class for the job.
+ */
public void setMapRunnerClass(Class extends MapRunnable> theClass) {
setClass("mapred.map.runner.class", theClass, MapRunnable.class);
}
+ /**
+ * Get the {@link Partitioner} used to partition {@link Mapper}-outputs
+ * to be sent to the {@link Reducer}s.
+ *
+ * @return the {@link Partitioner} used to partition map-outputs.
+ */
public Class extends Partitioner> getPartitionerClass() {
return getClass("mapred.partitioner.class",
HashPartitioner.class, Partitioner.class);
}
+
+ /**
+ * Set the {@link Partitioner} class used to partition
+ * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
+ *
+ * @param theClass the {@link Partitioner} used to partition map-outputs.
+ */
public void setPartitionerClass(Class extends Partitioner> theClass) {
setClass("mapred.partitioner.class", theClass, Partitioner.class);
}
+ /**
+ * Get the {@link Reducer} class for the job.
+ *
+ * @return the {@link Reducer} class for the job.
+ */
public Class extends Reducer> getReducerClass() {
return getClass("mapred.reducer.class",
IdentityReducer.class, Reducer.class);
}
+
+ /**
+ * Set the {@link Reducer} class for the job.
+ *
+ * @param theClass the {@link Reducer} class for the job.
+ */
public void setReducerClass(Class extends Reducer> theClass) {
setClass("mapred.reducer.class", theClass, Reducer.class);
}
+ /**
+ * Get the user-defined combiner class used to combine map-outputs
+ * before being sent to the reducers. Typically the combiner is same as the
+ * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
+ *
+ * @return the user-defined combiner class used to combine map-outputs.
+ */
public Class extends Reducer> getCombinerClass() {
return getClass("mapred.combiner.class", null, Reducer.class);
}
+
+ /**
+ * Set the user-defined combiner class used to combine map-outputs
+ * before being sent to the reducers.
+ *
+ * The combiner is a task-level aggregation operation which, in some cases, + * helps to cut down the amount of data transferred from the {@link Mapper} to + * the {@link Reducer}, leading to better performance.
+ * + *Typically the combiner is same as the the Reducer
for the
+ * job i.e. {@link #setReducerClass(Class)}.
true
.
+ *
+ * @return true
if speculative execution be used for this job,
+ * false
otherwise.
*/
public boolean getSpeculativeExecution() {
return getBoolean("mapred.speculative.execution", true);
}
/**
- * Turn on or off speculative execution for this job.
- * In general, it should be turned off for map jobs that have side effects.
+ * Turn speculative execution on or off for this job.
+ *
+ * @param speculativeExecution true
if speculative execution
+ * should be turned on, else false
.
*/
- public void setSpeculativeExecution(boolean new_val) {
- setBoolean("mapred.speculative.execution", new_val);
+ public void setSpeculativeExecution(boolean speculativeExecution) {
+ setBoolean("mapred.speculative.execution", speculativeExecution);
}
+ /**
+ * Get configured the number of reduce tasks for this job.
+ * Defaults to 1
.
+ *
+ * @return the number of reduce tasks for this job.
+ */
public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); }
+
+ /**
+ * Set the number of map tasks for this job.
+ *
+ * Note: This is only a hint to the framework. The actual + * number of spawned map tasks depends on the number of {@link InputSplit}s + * generated by the job's {@link InputFormat#getSplits(JobConf, int)}. + * + * A custom {@link InputFormat} is typically used to accurately control + * the number of map tasks for the job.
+ * + *The number of maps is usually driven by the total size of the inputs + * i.e. total number of blocks of the input files.
+ * + *The right level of parallelism for maps seems to be around 10-100 maps + * per-node, although it has been set up to 300 or so for very cpu-light map + * tasks. Task setup takes awhile, so it is best if the maps take at least a + * minute to execute.
+ * + *The default behavior of file-based {@link InputFormat}s is to split the + * input into logical {@link InputSplit}s based on the total size, in + * bytes, of input files. However, the {@link FileSystem} blocksize of the + * input files is treated as an upper bound for input splits. A lower bound + * on the split size can be set via + * + * mapred.min.split.size.
+ * + *Thus, if you expect 10TB of input data and have a blocksize of 128MB, + * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is + * used to set it even higher.
+ * + * @param n the number of map tasks for this job. + * @see InputFormat#getSplits(JobConf, int) + * @see FileInputFormat + * @see FileSystem#getDefaultBlockSize() + * @see FileStatus#getBlockSize() + */ public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); } + /** + * Get configured the number of reduce tasks for this job. Defaults to + *1
.
+ *
+ * @return the number of reduce tasks for this job.
+ */
public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 1); }
+
+ /**
+ * Set the requisite number of reduce tasks for this job.
+ *
+ * The right number of reduces seems to be 0.95
or
+ * 1.75
multiplied by (<no. of nodes> *
+ *
+ * mapred.tasktracker.tasks.maximum).
+ *
With 0.95
all of the reduces can launch immediately and
+ * start transfering map outputs as the maps finish. With 1.75
+ * the faster nodes will finish their first round of reduces and launch a
+ * second wave of reduces doing a much better job of load balancing.
Increasing the number of reduces increases the framework overhead, but + * increases load balancing and lowers the cost of failures.
+ * + *The scaling factors above are slightly less than whole numbers to + * reserve a few reduce slots in the framework for speculative-tasks, failures + * etc.
+ * + *It is legal to set the number of reduce-tasks to zero
.
In this case the output of the map-tasks directly go to distributed + * file-system, to the path set by {@link #setOutputPath(Path)}. Also, the + * framework doesn't sort the map-outputs before writing it out to HDFS.
+ * + * @param n the number of reduce tasks for this job. + */ public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); } - /** Get the configured number of maximum attempts that will be made to run a - * map task, as specified by themapred.map.max.attempts
- * property. If this property is not already set, the default is 4 attempts
- * @return the max number of attempts
+ /**
+ * Get the configured number of maximum attempts that will be made to run a
+ * map task, as specified by the mapred.map.max.attempts
+ * property. If this property is not already set, the default is 4 attempts.
+ *
+ * @return the max number of attempts per map task.
*/
public int getMaxMapAttempts() {
return getInt("mapred.map.max.attempts", 4);
}
- /** Expert: Set the number of maximum attempts that will be made to run a
- * map task
- * @param n the number of attempts
- *
+
+ /**
+ * Expert: Set the number of maximum attempts that will be made to run a
+ * map task.
+ *
+ * @param n the number of attempts per map task.
*/
public void setMaxMapAttempts(int n) {
setInt("mapred.map.max.attempts", n);
}
- /** Get the configured number of maximum attempts that will be made to run a
- * reduce task, as specified by the mapred.reduce.max.attempts
- * property. If this property is not already set, the default is 4 attempts
- * @return the max number of attempts
+ /**
+ * Get the configured number of maximum attempts that will be made to run a
+ * reduce task, as specified by the mapred.reduce.max.attempts
+ * property. If this property is not already set, the default is 4 attempts.
+ *
+ * @return the max number of attempts per reduce task.
*/
public int getMaxReduceAttempts() {
return getInt("mapred.reduce.max.attempts", 4);
}
- /** Expert: Set the number of maximum attempts that will be made to run a
- * reduce task
- * @param n the number of attempts
- *
+ /**
+ * Expert: Set the number of maximum attempts that will be made to run a
+ * reduce task.
+ *
+ * @param n the number of attempts per reduce task.
*/
public void setMaxReduceAttempts(int n) {
setInt("mapred.reduce.max.attempts", n);
@@ -600,7 +1000,8 @@
/**
* Get the user-specified job name. This is only used to identify the
* job to the user.
- * @return the job's name, defaulting to ""
+ *
+ * @return the job's name, defaulting to "".
*/
public String getJobName() {
return get("mapred.job.name", "");
@@ -608,7 +1009,8 @@
/**
* Set the user-specified job name.
- * @param name the job's new name
+ *
+ * @param name the job's new name.
*/
public void setJobName(String name) {
set("mapred.job.name", name);
@@ -627,16 +1029,16 @@
* When not running under HOD, this identifer is expected to remain set to
* the empty string.
*
- * @return the session identifier, defaulting to ""
+ * @return the session identifier, defaulting to "".
*/
public String getSessionId() {
return get("session.id", "");
}
/**
- * Set the user-specified session idengifier.
+ * Set the user-specified session identifier.
*
- * @param sessionId the new session id
+ * @param sessionId the new session id.
*/
public void setSessionId(String sessionId) {
set("session.id", sessionId);
@@ -644,6 +1046,8 @@
/**
* Set the maximum no. of failures of a given job per tasktracker.
+ * If the no. of task failures exceeds noFailures
, the
+ * tasktracker is blacklisted for this job.
*
* @param noFailures maximum no. of failures of a given job per tasktracker.
*/
@@ -652,7 +1056,9 @@
}
/**
- * Get the maximum no. of failures of a given job per tasktracker.
+ * Expert: Get the maximum no. of failures of a given job per tasktracker.
+ * If the no. of task failures exceeds this, the tasktracker is
+ * blacklisted for this job.
*
* @return the maximum no. of failures of a given job per tasktracker.
*/
@@ -662,21 +1068,30 @@
/**
* Get the maximum percentage of map tasks that can fail without
- * the job being aborted.
+ * the job being aborted.
+ *
+ * Each map task is executed a minimum of {@link #getMaxMapAttempts()}
+ * attempts before being declared as failed.
+ *
+ * Defaults to zero
, i.e. any failed map-task results in
+ * the job being declared as {@link JobStatus#FAILED}.
*
* @return the maximum percentage of map tasks that can fail without
- * the job being aborted
+ * the job being aborted.
*/
public int getMaxMapTaskFailuresPercent() {
return getInt("mapred.max.map.failures.percent", 0);
}
/**
- * Set the maximum percentage of map tasks that can fail without the job
- * being aborted.
+ * Expert: Set the maximum percentage of map tasks that can fail without the
+ * job being aborted.
+ *
+ * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts
+ * before being declared as failed.
*
* @param percent the maximum percentage of map tasks that can fail without
- * the job being aborted
+ * the job being aborted.
*/
public void setMaxMapTaskFailuresPercent(int percent) {
setInt("mapred.max.map.failures.percent", percent);
@@ -684,10 +1099,16 @@
/**
* Get the maximum percentage of reduce tasks that can fail without
- * the job being aborted.
+ * the job being aborted.
+ *
+ * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
+ * attempts before being declared as failed.
+ *
+ * Defaults to zero
, i.e. any failed reduce-task results
+ * in the job being declared as {@link JobStatus#FAILED}.
*
* @return the maximum percentage of reduce tasks that can fail without
- * the job being aborted
+ * the job being aborted.
*/
public int getMaxReduceTaskFailuresPercent() {
return getInt("mapred.max.reduce.failures.percent", 0);
@@ -697,24 +1118,29 @@
* Set the maximum percentage of reduce tasks that can fail without the job
* being aborted.
*
+ * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
+ * attempts before being declared as failed.
+ *
* @param percent the maximum percentage of reduce tasks that can fail without
- * the job being aborted
+ * the job being aborted.
*/
public void setMaxReduceTaskFailuresPercent(int percent) {
setInt("mapred.max.reduce.failures.percent", percent);
}
/**
- * Set job priority for this job.
+ * Set {@link JobPriority} for this job.
*
- * @param prio
+ * @param prio the {@link JobPriority} for this job.
*/
public void setJobPriority(JobPriority prio) {
set("mapred.job.priority", prio.toString());
}
/**
- * Get the job priority for this job.
+ * Get the {@link JobPriority} for this job.
+ *
+ * @return the {@link JobPriority} for this job.
*/
public JobPriority getJobPriority() {
String prio = get("mapred.job.priority");
@@ -725,12 +1151,44 @@
return JobPriority.valueOf(prio);
}
+ /**
+ * Get the uri to be invoked in-order to send a notification after the job
+ * has completed (success/failure).
+ *
+ * @return the job end notification uri, null
if it hasn't
+ * been set.
+ * @see #setJobEndNotificationURI(String)
+ */
+ public String getJobEndNotificationURI() {
+ return get("job.end.notification.url");
+ }
+
+ /**
+ * Set the uri to be invoked in-order to send a notification after the job
+ * has completed (success/failure).
+ *
+ * The uri can contain 2 special parameters: $jobId and + * $jobStatus. Those, if present, are replaced by the job's + * identifier and completion-status respectively.
+ * + *This is typically used by application-writers to implement chaining of + * Map-Reduce jobs in an asynchronous manner.
+ * + * @param uri the job end notification uri + * @see JobStatus + * @see Job Completion and Chaining + */ + public void setJobEndNotificationURI(String uri) { + set("job.end.notification.url", uri); + } - /** Find a jar that contains a class of the same name, if any. + /** + * Find a jar that contains a class of the same name, if any. * It will return a jar file, even if that is not the first thing * on the class path that has a class with the same name. - * @param my_class the class to find - * @return a jar file that contains the class, or null + * + * @param my_class the class to find. + * @return a jar file that contains the class, or null. * @throws IOException */ private static String findContainingJar(Class my_class) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java Wed Oct 24 14:18:51 2007 @@ -95,7 +95,7 @@ private static JobEndStatusInfo createNotification(JobConf conf, JobStatus status) { JobEndStatusInfo notification = null; - String uri = conf.get("job.end.notification.url"); + String uri = conf.getJobEndNotificationURI(); if (uri != null) { // +1 to make logic for first notification identical to a retry int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapReduceBase.java Wed Oct 24 14:18:51 2007 @@ -23,8 +23,11 @@ import org.apache.hadoop.io.Closeable; import org.apache.hadoop.mapred.JobConfigurable; -/** Base class for {@link Mapper} and {@link Reducer} implementations. - * Provides default implementations for a few methods. +/** + * Base class for {@link Mapper} and {@link Reducer} implementations. + * + *Provides default no-op implementations for a few methods, most non-trivial + * applications need to override some of them.
*/ public class MapReduceBase implements Closeable, JobConfigurable { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java Wed Oct 24 14:18:51 2007 @@ -23,15 +23,28 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -/** Expert: Permits greater control of map processing. For example, - * implementations might perform multi-threaded, asynchronous mappings. */ +/** + * Expert: Generic interface for {@link Mapper}s. + * + *Custom implementations of MapRunnable
can exert greater
+ * control on map processing e.g. multi-threaded, asynchronous mappers etc.
Mapping of input records to output records is complete when this method + * returns.
+ * + * @param input the {@link RecordReader} to read the input records. + * @param output the {@link OutputCollector} to collect the outputrecords. + * @param reporter {@link Reporter} to report progress, status-updates etc. + * @throws IOException */ void run(RecordReaderMaps are the individual tasks which transform input records into a + * intermediate records. The transformed intermediate records need not be of + * the same type as the input records. A given input pair may map to zero or + * many output pairs.
+ * + *The Hadoop Map-Reduce framework spawns one map task for each
+ * {@link InputSplit} generated by the {@link InputFormat} for the job.
+ * Mapper
implementations can access the {@link JobConf} for the
+ * job via the {@link JobConfigurable#configure(JobConf)} and initialize
+ * themselves. Similarly they can use the {@link Closeable#close()} method for
+ * de-initialization.
The framework then calls
+ * {@link #map(WritableComparable, Writable, OutputCollector, Reporter)}
+ * for each key/value pair in the InputSplit
for that task.
All intermediate values associated with a given output key are
+ * subsequently grouped by the framework, and passed to a {@link Reducer} to
+ * determine the final output. Users can control the grouping by specifying
+ * a Comparator
via
+ * {@link JobConf#setOutputKeyComparatorClass(Class)}.
The grouped Mapper
outputs are partitioned per
+ * Reducer
. Users can control which keys (and hence records) go to
+ * which Reducer
by implementing a custom {@link Partitioner}.
+ *
+ *
Users can optionally specify a combiner
, via
+ * {@link JobConf#setCombinerClass(Class)}, to perform local aggregation of the
+ * intermediate outputs, which helps to cut down the amount of data transferred
+ * from the Mapper
to the Reducer
.
+ *
+ *
The intermediate, grouped outputs are always stored in
+ * {@link SequenceFile}s. Applications can specify if and how the intermediate
+ * outputs are to be compressed and which {@link CompressionCodec}s are to be
+ * used via the JobConf
.
If the job has
+ * zero
+ * reduces then the output of the Mapper
is directly written
+ * to the {@link FileSystem} without grouping by keys.
Example:
+ *+ * public class MyMapper<K extends WritableComparable, V extends Writable> + * extends MapReduceBase implements Mapper<K, V, K, V> { + * + * static enum MyCounters { NUM_RECORDS } + * + * private String mapTaskId; + * private String inputFile; + * private int noRecords = 0; + * + * public void configure(JobConf job) { + * mapTaskId = job.get("mapred.task.id"); + * inputFile = job.get("mapred.input.file"); + * } + * + * public void map(K key, V val, + * OutputCollector<K, V> output, Reporter reporter) + * throws IOException { + * // Process the <key, value> pair (assume this takes a while) + * // ... + * // ... + * + * // Let the framework know that we are alive, and kicking! + * // reporter.progress(); + * + * // Process some more + * // ... + * // ... + * + * // Increment the no. of <key, value> pairs processed + * ++noRecords; + * + * // Increment counters + * reporter.incrCounter(NUM_RECORDS, 1); + * + * // Every 100 records update application-level status + * if ((noRecords%100) == 0) { + * reporter.setStatus(mapTaskId + " processed " + noRecords + + * " from input-file: " + inputFile); + * } + * + * // Output the result + * output.collect(key, val); + * } + * } + * + *Applications may write a custom {@link MapRunnable} to exert greater + * control on map processing e.g. multi-threaded
+ * + * @see JobConf + * @see InputFormat + * @see Partitioner + * @see Reducer + * @see MapReduceBase + * @see MapRunnable + * @see SequenceFile + */ public interface MapperMapper
s etc.extends JobConfigurable, Closeable { - /** Maps a single input key/value pair into intermediate key/value pairs. - * Output pairs need not be of the same types as input pairs. A given input - * pair may map to zero or many output pairs. Output pairs are collected - * with calls to {@link - * OutputCollector#collect(WritableComparable,Writable)}. + /** + * Maps a single input key/value pair into an intermediate key/value pair. + * + * Output pairs need not be of the same types as input pairs. A given + * input pair may map to zero or many output pairs. Output pairs are + * collected with calls to + * {@link OutputCollector#collect(WritableComparable,Writable)}.
* - * @param key the key - * @param value the values - * @param output collects mapped keys and values + *Applications can use the {@link Reporter} provided to report progress + * or just indicate that they are alive. In scenarios where the application + * takes an insignificant amount of time to process individual key/value + * pairs, this is crucial since the framework might assume that the task has + * timed-out and kill that task. The other way of avoiding this is to set + * + * mapred.task.timeout to a high-enough value (or even zero for no + * time-outs).
+ * + * @param key the input key. + * @param value the input value. + * @param output collects mapped keys and values. + * @param reporter facility to report progress. */ - void map(K1 key, V1 value, - OutputCollectoroutput, Reporter reporter) - throws IOException; + void map(K1 key, V1 value, OutputCollector output, Reporter reporter) + throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java Wed Oct 24 14:18:51 2007 @@ -24,15 +24,23 @@ import org.apache.hadoop.io.WritableComparable; -/** Passed to {@link Mapper} and {@link Reducer} implementations to collect - * output data. */ +/** + * Collects the <key, value>
pairs output by {@link Mapper}s + * and {@link Reducer}s. + * + *+ */ public interface OutputCollector
OutputCollector
is the generalization of the facility + * provided by the Map-Reduce framework to collect data output by either the + *Mapper
or theReducer
i.e. intermediate outputs + * or the output of the job.{ /** Adds a key/value pair to the output. * - * @param key the key to add - * @param value to value to add + * @param key the key to collect. + * @param value to value to collect. + * @throws IOException */ void collect(K key, V value) throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java Wed Oct 24 14:18:51 2007 @@ -25,28 +25,53 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.Progressable; -/** An output data format. Output files are stored in a {@link - * FileSystem}. */ +/** + * OutputFormat
describes the output-specification for a + * Map-Reduce job. + * + *The Map-Reduce framework relies on the
OutputFormat
of the + * job to:+ *
+ *
+ * + * @see RecordWriter + * @see JobConf + */ public interface OutputFormat- + * Validate the output-specification of the job. For e.g. check that the + * output directory doesn't already exist. + *
- + * Provide the {@link RecordWriter} implementation to be used to write out + * the output files of the job. Output files are stored in a + * {@link FileSystem}. + *
+ *{ - /** Construct a {@link RecordWriter} with Progressable. + /** + * Get the {@link RecordWriter} for the given job. * - * @param job the job whose output is being written - * @param name the unique name for this part of the output - * @param progress mechanism for reporting progress while writing to file - * @return a {@link RecordWriter} + * @param ignored + * @param job configuration for the job whose output is being written. + * @param name the unique name for this part of the output. + * @param progress mechanism for reporting progress while writing to file. + * @return a {@link RecordWriter} to write the output for the job. + * @throws IOException */ RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) - throws IOException; + throws IOException; - /** Check whether the output specification for a job is appropriate. Called - * when a job is submitted. Typically checks that it does not already exist, + /** + * Check for validity of the output-specification for the job. + * + * This is to validate the output specification for the job when it is + * a job is submitted. Typically checks that it does not already exist, * throwing an exception when it already exists, so that output is not - * overwritten. + * overwritten.
* - * @param job the job whose output will be written + * @param ignored + * @param job job configuration. * @throws IOException when output should not be attempted */ void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java Wed Oct 24 14:18:51 2007 @@ -21,18 +21,32 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -/** Partitions the key space. A partition is created for each reduce task. */ +/** + * Partitions the key space. + * + *+ * + * @see Reducer + */ public interface Partitioner
Partitioner
controls the partitioning of the keys of the + * intermediate map-outputs. The key (or a subset of the key) is used to derive + * the partition, typically by a hash function. The total number of partitions + * is the same as the number of reduce tasks for the job. Hence this controls + * which of them
reduce tasks the intermediate key (and hence the + * record) is sent for reduction.extends JobConfigurable { - /** Returns the paritition number for a given entry given the total number of - * partitions. Typically a hash function on a all or a subset of the key. + /** + * Get the paritition number for a given key (hence record) given the total + * number of partitions i.e. number of reduce-tasks for the job. + * + * Typically a hash function on a all or a subset of the key.
* - * @param key the entry key - * @param value the entry value - * @param numPartitions the number of partitions - * @return the partition number + * @param key the key to be paritioned. + * @param value the entry value. + * @param numPartitions the total number of partitions. + * @return the partition number for thekey
. */ int getPartition(K2 key, V2 value, int numPartitions); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java Wed Oct 24 14:18:51 2007 @@ -24,11 +24,23 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -/** Reads key/value pairs from an input file {@link FileSplit}. - * Implemented by {@link InputFormat} implementations. */ +/** + *RecordReader
reads <key, value> pairs from an + * {@link InputSplit}. + * + *+ * + * @see InputSplit + * @see InputFormat + */ public interface RecordReader
RecordReader
, typically, converts the byte-oriented view of + * the input, provided by theInputSplit
, and presents a + * record-oriented view for the {@link Mapper} & {@link Reducer} tasks for + * processing. It thus assumes the responsibility of processing record + * boundaries and presenting the tasks with keys and values.{ - /** Reads the next key/value pair. + /** + * Reads the next key/value pair from the input for processing. * * @param key the key to read data into * @param value the value to read data into @@ -40,25 +52,38 @@ /** * Create an object of the appropriate type to be used as a key. - * @return a new key object + * + * @return a new key object. */ K createKey(); /** - * Create an object of the appropriate type to be used as the value. - * @return a new value object + * Create an object of the appropriate type to be used as a value. + * + * @return a new value object. */ V createValue(); - /** Returns the current position in the input. */ + /** + * Returns the current position in the input. + * + * @return the current position in the input. + * @throws IOException + */ long getPos() throws IOException; - /** Close this to future operations.*/ + /** + * Close this {@link InputSplit} to future operations. + * + * @throws IOException + */ public void close() throws IOException; /** - * How far has the reader gone through the input. - * @return progress from 0.0 to 1.0 + * How much of the input has the {@link RecordReader} consumed i.e. + * has been processed by? + * + * @return progress from 0.0
to1.0
. * @throws IOException */ float getProgress() throws IOException; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java Wed Oct 24 14:18:51 2007 @@ -21,22 +21,36 @@ import java.io.IOException; import java.io.DataOutput; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; -/** Writes key/value pairs to an output file. Implemented by {@link - * OutputFormat} implementations. */ +/** + *RecordWriter
writes the output <key, value> pairs + * to an output file. + + *+ * + * @see Mapper + * @see Partitioner + * @see Reporter + * @see MapReduceBase + */ public interface Reducer
RecordWriter
implementations write the job outputs to the + * {@link FileSystem}. + * + * @see OutputFormat + */ public interface RecordWriter{ - /** Writes a key/value pair. - * - * @param key the key to write - * @param value the value to write + /** + * Writes a key/value pair. * + * @param key the key to write. + * @param value the value to write. + * @throws IOException * @see Writable#write(DataOutput) */ void write(K key, V value) throws IOException; - /** Close this to future operations.*/ + /** + * Close this RecordWriter
to future operations. + * + * @param reporter facility to report progress. + * @throws IOException + */ void close(Reporter reporter) throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java Wed Oct 24 14:18:51 2007 @@ -22,24 +22,176 @@ import java.util.Iterator; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Closeable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -/** Reduces a set of intermediate values which share a key to a smaller set of - * values. Input values are the grouped output of a {@link Mapper}. */ +/** + * Reduces a set of intermediate values which share a key to a smaller set of + * values. + * + *The number of
+ + *Reducer
s for the job is set by the user via + * {@link JobConf#setNumReduceTasks(int)}.Reducer
implementations + * can access the {@link JobConf} for the job via the + * {@link JobConfigurable#configure(JobConf)} method and initialize themselves. + * Similarly they can use the {@link Closeable#close()} method for + * de-initialization.+ *
Reducer
has 3 primary phases:+ *
+ * + *- + * + *
+ * + *Shuffle
+ * + *+ *
Reducer
is input the grouped output of a {@link Mapper}. + * In the phase the framework, for eachReducer
, fetches the + * relevant partition of the output of all theMapper
s, via HTTP. + *- + *
+ * + *Sort
+ * + *The framework groups
+ * + *Reducer
inputs bykey
s + * (since differentMapper
s may have output the same key) in this + * stage.The shuffle and sort phases occur simultaneously i.e. while outputs are + * being fetched they are merged.
+ * + *SecondarySort
+ * + *If equivalence rules for keys while grouping the intermediates are + * different from those for grouping keys before reduction, then one may + * specify a
+ * + * + * For example, say that you want to find duplicate web pages and tag them + * all with the url of the "best" known example. You would set up the job + * like: + *Comparator
via + * {@link JobConf#setOutputValueGroupingComparator(Class)}.Since + * {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to + * control how intermediate keys are grouped, these can be used in conjunction + * to simulate secondary sort on values.+ *
+ *- Map Input Key: url
+ *- Map Input Value: document
+ *- Map Output Key: document checksum, url pagerank
+ *- Map Output Value: url
+ *- Partitioner: by checksum
+ *- OutputKeyComparator: by checksum and then decreasing pagerank
+ *- OutputValueGroupingComparator: by checksum
+ *- + *
+ *Reduce
+ * + *In this phase the + * {@link #reduce(WritableComparable, Iterator, OutputCollector, Reporter)} + * method is called for each
+ *<key, (list of values)>
pair in + * the grouped inputs.The output of the reduce task is typically written to the + * {@link FileSystem} via + * {@link OutputCollector#collect(WritableComparable, Writable)}.
+ *The output of the
+ * + *Reducer
is not re-sorted.Example:
+ *+ * public class MyReducer<K extends WritableComparable, V extends Writable> + * extends MapReduceBase implements Reducer<K, V, K, V> { + * + * static enum MyCounters { NUM_RECORDS } + * + * private String reduceTaskId; + * private int noKeys = 0; + * + * public void configure(JobConf job) { + * reduceTaskId = job.get("mapred.task.id"); + * } + * + * public void reduce(K key, Iterator<V> values, + * OutputCollector<K, V> output, + * Reporter reporter) + * throws IOException { + * + * // Process + * int noValues = 0; + * while (values.hasNext()) { + * V value = values.next(); + * + * // Increment the no. of values for this key + * ++noValues; + * + * // Process the <key, value> pair (assume this takes a while) + * // ... + * // ... + * + * // Let the framework know that we are alive, and kicking! + * if ((noValues%10) == 0) { + * reporter.progress(); + * } + * + * // Process some more + * // ... + * // ... + * + * // Output the <key, value> + * output.collect(key, value); + * } + * + * // Increment the no. of <key, list of values> pairs processed + * ++noKeys; + * + * // Increment counters + * reporter.incrCounter(NUM_RECORDS, 1); + * + * // Every 100 keys update application-level status + * if ((noKeys%100) == 0) { + * reporter.setStatus(reduceTaskId + " processed " + noKeys); + * } + * } + * } + *extends JobConfigurable, Closeable { - /** Combines values for a given key. Output values must be of the same type - * as input values. Input keys must not be altered. Typically all values - * are combined into zero or one value. Output pairs are collected with - * calls to {@link OutputCollector#collect(WritableComparable,Writable)}. + /** + * Reduces values for a given key. + * + * The framework calls this method for each + *
+ * + *<key, (list of values)>
pair in the grouped inputs. + * Output values must be of the same type as input values. Input keys must + * not be altered. Typically all values are combined into zero or one value. + *Output pairs are collected with calls to + * {@link OutputCollector#collect(WritableComparable,Writable)}.
* - * @param key the key - * @param values the values to combine - * @param output to collect combined values + *Applications can use the {@link Reporter} provided to report progress + * or just indicate that they are alive. In scenarios where the application + * takes an insignificant amount of time to process individual key/value + * pairs, this is crucial since the framework might assume that the task has + * timed-out and kill that task. The other way of avoiding this is to set + * + * mapred.task.timeout to a high-enough value (or even zero for no + * time-outs).
+ * + * @param key the key. + * @param values the list of values to reduce. + * @param output to collect keys and combined values. + * @param reporter facility to report progress. */ void reduce(K2 key, Iteratorvalues, OutputCollector output, Reporter reporter) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Wed Oct 24 14:18:51 2007 @@ -18,11 +18,24 @@ package org.apache.hadoop.mapred; -import java.io.IOException; - import org.apache.hadoop.util.Progressable; -/** Passed to application code to permit alteration of status. */ +/** + * A facility for Map-Reduce applications to report progress and update + * counters, status information etc. + * + * {@link Mapper} and {@link Reducer} can use the
Reporter
+ * provided to report progress or just indicate that they are alive. In + * scenarios where the application takes an insignificant amount of time to + * process individual key/value pairs, this is crucial since the framework + * might assume that the task has timed-out and kill that task. + * + *Applications can also update {@link Counters} via the provided + *
+ * + * @see Progressable + * @see Counters + */ public interface Reporter extends Progressable { /** @@ -41,25 +54,27 @@ }; /** - * Alter the application's status description. + * Set the status description for the task. * - * @param status - * a brief description of the current status + * @param status brief description of the current status. */ public abstract void setStatus(String status); /** * Increments the counter identified by the key, which can be of - * any enum type, by the specified amount. - * @param key A value of any enum type + * any {@link Enum} type, by the specified amount. + * + * @param key key to identify the counter to be incremented. The key can be + * be anyReporter
.Enum
. * @param amount A non-negative amount by which the counter is to - * be incremented + * be incremented. */ public abstract void incrCounter(Enum key, long amount); /** - * Get the InputSplit object for a map. - * @return the input split that the map is reading from + * Get the {@link InputSplit} object for a map. + * + * @return theInputSplit
that the map is reading from. * @throws UnsupportedOperationException if called outside a mapper */ public abstract InputSplit getInputSplit() Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Wed Oct 24 14:18:51 2007 @@ -21,78 +21,120 @@ import java.io.*; /** - * Includes details on a running MapReduce job. A client can - * track a living job using this object. + *RunningJob
is the user-interface to query for details on a + * running Map-Reduce job. + * + *Clients can get hold of
+ * + * @see JobClient */ public interface RunningJob { /** - * Returns an identifier for the job + * Get the job identifier. + * + * @return the job identifier. */ public String getJobID(); /** - * Returns the name of the job + * Get the name of the job. + * + * @return the name of the job. */ public String getJobName(); /** - * Returns the path of the submitted job. + * Get the path of the submitted job configuration. + * + * @return the path of the submitted job configuration. */ public String getJobFile(); /** - * Returns a URL where some job progress information will be displayed. + * Get the URL where some job progress information will be displayed. + * + * @return the URL where some job progress information will be displayed. */ public String getTrackingURL(); /** - * Returns a float between 0.0 and 1.0, indicating progress on - * the map portion of the job. When all map tasks have completed, - * the function returns 1.0. + * Get the progress of the job's map-tasks, as a float between 0.0 + * and 1.0. When all map tasks have completed, the function returns 1.0. + * + * @return the progress of the job's map-tasks. + * @throws IOException */ public float mapProgress() throws IOException; /** - * Returns a float between 0.0 and 1.0, indicating progress on - * the reduce portion of the job. When all reduce tasks have completed, - * the function returns 1.0. + * Get the progress of the job's reduce-tasks, as a float between 0.0 + * and 1.0. When all reduce tasks have completed, the function returns 1.0. + * + * @return the progress of the job's reduce-tasks. + * @throws IOException */ public float reduceProgress() throws IOException; /** - * Non-blocking function to check whether the job is finished or not. + * Check if the job is finished or not. + * This is a non-blocking call. + * + * @returnRunningJob
via the {@link JobClient} + * and then query the running-job for details such as name, configuration, + * progress etc.true
if the job is complete, elsefalse
. + * @throws IOException */ public boolean isComplete() throws IOException; /** - * True iff job completed successfully. + * Check if the job completed successfully. + * + * @returntrue
if the job succeeded, elsefalse
. + * @throws IOException */ public boolean isSuccessful() throws IOException; /** * Blocks until the job is complete. + * + * @throws IOException */ public void waitForCompletion() throws IOException; /** * Kill the running job. Blocks until all job tasks have been * killed as well. If the job is no longer running, it simply returns. + * + * @throws IOException */ public void killJob() throws IOException; - public TaskCompletionEvent[] getTaskCompletionEvents( - int startFrom) throws IOException; + /** + * Get events indicating completion (success/failure) of component tasks. + * + * @param startFrom index to start fetching events from + * @return an array of {@link TaskCompletionEvent}s + * @throws IOException + */ + public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) + throws IOException; /** * Kill indicated task attempt. - * @param taskId the id of the task to kill. - * @param shouldFail if true the task is failed and added to failed tasks list, otherwise - * it is just killed, w/o affecting job failure status. + * + * @param taskId the id of the task to be terminated. + * @param shouldFail if true the task is failed and added to failed tasks + * list, otherwise it is just killed, w/o affecting + * job failure status. + * @throws IOException */ public void killTask(String taskId, boolean shouldFail) throws IOException; /** * Gets the counters for this job. + * + * @return the counters for this job. + * @throws IOException */ public Counters getCounters() throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html?rev=588033&r1=588032&r2=588033&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/package.html Wed Oct 24 14:18:51 2007 @@ -1,16 +1,212 @@ -A system for scalable, fault-tolerant, distributed computation over -large data collections.
+A software framework for easily writing applications which process vast +amounts of data (multi-terabyte data-sets) parallelly on large clusters +(thousands of nodes) built of commodity hardware in a reliable, fault-tolerant +manner.
-Applications implement {@link org.apache.hadoop.mapred.Mapper} and -{@link org.apache.hadoop.mapred.Reducer} interfaces. These are submitted -as a {@link org.apache.hadoop.mapred.JobConf} and are applied to data -stored in a {@link org.apache.hadoop.fs.FileSystem}.
+A Map-Reduce job usually splits the input data-set into independent +chunks which processed by map tasks in completely parallel manner, +followed by reduce tasks which aggregating their output. Typically both +the input and the output of the job are stored in a +{@link org.apache.hadoop.fs.FileSystem}. The framework takes care of monitoring +tasks and re-executing failed ones. Since, usually, the compute nodes and the +storage nodes are the same i.e. Hadoop's Map-Reduce framework and Distributed +FileSystem are running on the same set of nodes, tasks are effectively scheduled +on the nodes where data is already present, resulting in very high aggregate +bandwidth across the cluster.
-See Google's -original Map/Reduce paper for background information.
+The Map-Reduce framework operates exclusively on <key, value> +pairs i.e. the input to the job is viewed as a set of <key, value> +pairs and the output as another, possibly different, set of +<key, value> pairs. The keys and values have to +be serializable as {@link org.apache.hadoop.io.Writable}s and additionally the +keys have to be {@link org.apache.hadoop.io.WritableComparable}s in +order to facilitate grouping by the framework.
+ +Data flow:
++ (input) + <k1, v1> + + | + V + + map + + | + V + + <k2, v2> + + | + V + + combine + + | + V + + <k2, v2> + + | + V + + reduce + + | + V + + <k3, v3> + (output) ++ +Applications typically implement +{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)} +and +{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)} +methods. The application-writer also specifies various facets of the job such +as input and output locations, the Partitioner, InputFormat +& OutputFormat implementations to be used etc. as +a {@link org.apache.hadoop.mapred.JobConf}. The client program, +{@link org.apache.hadoop.mapred.JobClient}, then submits the job to the framework +and optionally monitors it.
+ +The framework spawns one map task per +{@link org.apache.hadoop.mapred.InputSplit} generated by the +{@link org.apache.hadoop.mapred.InputFormat} of the job and calls +{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)} +with each <key, value> pair read by the +{@link org.apache.hadoop.mapred.RecordReader} from the InputSplit for +the task. The intermediate outputs of the maps are then grouped by keys +and optionally aggregated by combiner. The key space of intermediate +outputs are paritioned by the {@link org.apache.hadoop.mapred.Partitioner}, where +the number of partitions is exactly the number of reduce tasks for the job.
+ +The reduce tasks fetch the sorted intermediate outputs of the maps, via http, +merge the <key, value> pairs and call +{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)} +for each <key, list of values> pair. The output of the reduce tasks' is +stored on the FileSystem by the +{@link org.apache.hadoop.mapred.RecordWriter} provided by the +{@link org.apache.hadoop.mapred.OutputFormat} of the job.
+ +Map-Reduce application to perform a distributed grep:
++public class Grep extends Configured implements Tool { + + // map: Search for the pattern specified by 'grep.mapper.regex' & + // 'grep.mapper.regex.group' + + class GrepMapper<K extends WritableComparable, Text> + extends MapReduceBase implements Mapper<K, Text, Text, LongWritable> { + + private Pattern pattern; + private int group; + + public void configure(JobConf job) { + pattern = Pattern.compile(job.get("grep.mapper.regex")); + group = job.getInt("grep.mapper.regex.group", 0); + } + + public void map(K key, Text value, + OutputCollector<Text, LongWritable> output, + Reporter reporter) + throws IOException { + String text = value.toString(); + Matcher matcher = pattern.matcher(text); + while (matcher.find()) { + output.collect(new Text(matcher.group(group)), new LongWritable(1)); + } + } + } + + // reduce: Count the number of occurrences of the pattern + + class GrepReducer<K extends WritableComparable> extends MapReduceBase + implements Reducer<K, LongWritable, K, LongWritable> { + + public void reduce(K key, Iterator<LongWritable> values, + OutputCollector<K, LongWritable> output, + Reporter reporter) + throws IOException { + + // sum all values for this key + long sum = 0; + while (values.hasNext()) { + sum += values.next().get(); + } + + // output sum + output.collect(key, new LongWritable(sum)); + } + } + + public int run(String[] args) throws Exception { + if (args.length < 3) { + System.out.println("Grep <inDir> <outDir> <regex> [<group>]"); + ToolRunner.printGenericCommandUsage(System.out); + return -1; + } + + JobConf grepJob = new JobConf(getConf(), Grep.class); + + grepJob.setJobName("grep"); + + grepJob.setInputPath(new Path(args[0])); + grepJob.setOutputPath(args[1]); + + grepJob.setMapperClass(GrepMapper.class); + grepJob.setCombinerClass(GrepReducer.class); + grepJob.setReducerClass(GrepReducer.class); + + grepJob.set("mapred.mapper.regex", args[2]); + if (args.length == 4) + grepJob.set("mapred.mapper.regex.group", args[3]); + + grepJob.setOutputFormat(SequenceFileOutputFormat.class); + grepJob.setOutputKeyClass(Text.class); + grepJob.setOutputValueClass(LongWritable.class); + + JobClient.runJob(grepJob); + + return 0; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new Grep(), args); + System.exit(res); + } + +} ++ +Notice how the data-flow of the above grep job is very similar to doing the +same via the unix pipeline:
+ ++cat input/* | grep | sort | uniq -c > out ++ ++ input | map | shuffle | reduce > out ++ +Hadoop Map-Reduce applications need not be written in +JavaTM only. +Hadoop Streaming is a utility +which allows users to create and run jobs with any executables (e.g. shell +utilities) as the mapper and/or the reducer. +Hadoop Pipes is a +SWIG-compatible C++ API to implement +Map-Reduce applications (non JNITM based).
+ +See Google's original +Map/Reduce paper for background information.
+ +Java and JNI are trademarks or registered trademarks of +Sun Microsystems, Inc. in the United States and other countries.