Return-Path:
Typically the compute nodes and the storage nodes are the same, that is,
the MapReduce framework and the
- Hadoop Distributed File System (HDFS)
+ Hadoop
+ Distributed File System (HDFS)
are running on the same set of nodes. This configuration
allows the framework to effectively schedule tasks on the nodes where data
is already present, resulting in very high aggregate bandwidth across the
@@ -117,11 +118,14 @@
the job, conceivably of different types. The Input and Output types of a MapReduce job:key
and value
classes have to be
- serializable by the framework and hence need to implement the
- Writable
- interface. Additionally, the key
classes have to implement the
+ serializable by the framework. Several serialization systems exists; the
+ default serialization mechanism requires keys and values to implement
+ the
+ Writable interface.
+ Additionally, the key
classes must facilitate sorting by the
+ framework; a straightforward means to do so is for them to implement the
- WritableComparable interface to facilitate sorting by the framework.
+ WritableComparable interface.
<k2, v2>
->
- combine
+ combine*
->
<k2, v2>
->
@@ -140,6 +144,8 @@
->
<k3, v3>
(output)
Note that the combine phase may run zero or more times in this + process.
package org.myorg;
- import java.io.IOException;
- import java.util.*;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.conf.*;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapred.*;
- import org.apache.hadoop.util.*;
- public class WordCount {
-
- public static class Map extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, IntWritable> {
-
-
- private final static IntWritable one = new IntWritable(1);
-
- private Text word = new Text();
-
- public void map(LongWritable key, Text value,
- OutputCollector<Text, IntWritable> output,
- Reporter reporter) throws IOException {
-
- String line = value.toString();
- StringTokenizer tokenizer = new StringTokenizer(line);
- while (tokenizer.hasMoreTokens()) {
- word.set(tokenizer.nextToken());
- output.collect(word, one);
- }
- }
- }
-
- public static class Reduce extends MapReduceBase implements
- Reducer<Text, IntWritable, Text, IntWritable> {
-
-
- public void reduce(Text key, Iterator<IntWritable> values,
- OutputCollector<Text, IntWritable> output,
- Reporter reporter) throws IOException {
-
- int sum = 0;
- while (values.hasNext()) {
- sum += values.next().get();
- }
- output.collect(key, new IntWritable(sum));
- }
- }
-
- public static void main(String[] args) throws Exception {
-
-
- JobConf conf = new JobConf(WordCount.class);
-
- conf.setJobName("wordcount");
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(IntWritable.class);
- conf.setMapperClass(Map.class);
- conf.setCombinerClass(Reduce.class);
- conf.setReducerClass(Reduce.class);
- conf.setInputFormat(TextInputFormat.class);
- conf.setOutputFormat(TextOutputFormat.class);
- FileInputFormat.setInputPaths(conf, new Path(args[0]));
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
- JobClient.runJob(conf);
- }
- }
- package org.myorg;
+
+
import java.io.IOException;
+
import java.util.*;
+
+
import org.apache.hadoop.fs.Path;
+
import org.apache.hadoop.conf.*;
+
import org.apache.hadoop.io.*;
+
import org.apache.hadoop.mapreduce.*;
+
import org.apache.hadoop.mapreduce.lib.input.*;
+
import org.apache.hadoop.mapreduce.lib.output.*;
+
import org.apache.hadoop.util.*;
+
+
public class WordCount extends Configured implements Tool {
+
+
public static class Map
+
extends Mapper<LongWritable, Text, Text, IntWritable> {
+
private final static IntWritable one = new IntWritable(1);
+
private Text word = new Text();
+
+
public void map(LongWritable key, Text value, Context context)
+
throws IOException, InterruptedException {
+
String line = value.toString();
+
StringTokenizer tokenizer = new StringTokenizer(line);
+
while (tokenizer.hasMoreTokens()) {
+
word.set(tokenizer.nextToken());
+
context.write(word, one);
+
}
+
}
+
}
+
+
public static class Reduce
+
extends Reducer<Text, IntWritable, Text, IntWritable> {
+
public void reduce(Text key, Iterable<IntWritable> values,
+
Context context) throws IOException, InterruptedException {
+
+
int sum = 0;
+
for (IntWritable val : values) {
+
sum += val.get();
+
}
+
context.write(key, new IntWritable(sum));
+
}
+
}
+
+
public int run(String [] args) throws Exception {
+
Job job = new Job(getConf());
+
job.setJarByClass(WordCount.class);
+
job.setJobName("wordcount");
+
+
job.setOutputKeyClass(Text.class);
+
job.setOutputValueClass(IntWritable.class);
+
+
job.setMapperClass(Map.class);
+
job.setCombinerClass(Reduce.class);
+
job.setReducerClass(Reduce.class);
+
+
job.setInputFormatClass(TextInputFormat.class);
+
job.setOutputFormatClass(TextOutputFormat.class);
+
+
FileInputFormat.setInputPaths(job, new Path(args[0]));
+
FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+
boolean success = job.waitForCompletion(true);
+
return success ? 0 : 1;
+
}
+
+
public static void main(String[] args) throws Exception {
+
int ret = ToolRunner.run(new WordCount(), args);
+
System.exit(ret);
+
}
+
}
+
+
$ mkdir wordcount_classes
- $ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar
+ $ javac -classpath
+ ${HADOOP_HOME}/hadoop-core-${HADOOP_VERSION}.jar:${HADOOP_HOME}/hadoop-mapred-${HADOOP_VERSION}.jar:${HADOOP_HOME}/hadoop-hdfs-${HADOOP_VERSION}.jar
-d wordcount_classes WordCount.java
- $ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .
+ $ jar -cvf /user/joe/wordcount.jar -C wordcount_classes/ .
Assuming that:
/usr/joe/wordcount/input
- input directory in HDFS
+ /user/joe/wordcount/input
- input directory in HDFS
/usr/joe/wordcount/output
- output directory in HDFS
+ /user/joe/wordcount/output
- output directory in HDFS
Sample text-files as input:
- $ bin/hadoop dfs -ls /usr/joe/wordcount/input/
- /usr/joe/wordcount/input/file01
- /usr/joe/wordcount/input/file02
+ $ bin/hadoop fs -ls /user/joe/wordcount/input/
+ /user/joe/wordcount/input/file01
+ /user/joe/wordcount/input/file02
- $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
+ $ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World
- $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
+ $ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
Run the application:
- $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount
- /usr/joe/wordcount/input /usr/joe/wordcount/output
+ $ bin/hadoop jar /user/joe/wordcount.jar org.myorg.WordCount
+ /user/joe/wordcount/input /user/joe/wordcount/output
Output:
- $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
+ $ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
@@ -602,19 +376,27 @@
Hello 2
World 2
Applications can specify a comma separated list of paths which +
Applications can specify a comma-separated list of paths which
would be present in the current working directory of the task
using the option -files
. The -libjars
option allows applications to add jars to the classpaths of the maps
and reduces. The option -archives
allows them to pass
comma separated list of archives as arguments. These archives are
unarchived and a link with name of the archive is created in
- the current working directory of tasks. More
- details about the command line options are available at
+ the current working directory of tasks. The mechanism that
+ provides this functionality is called the distributed cache.
+ More details about the command line options surrounding job launching
+ and control of the distributed cache are available at
Hadoop Commands Guide.
Running wordcount
example with
+
Hadoop ships with some example code in a jar precompiled for you;
+ one of these is (another) wordcount program. Here's an example
+ invocation of the wordcount
example with
-libjars
, -files
and -archives
:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt
@@ -634,18 +416,26 @@
Here, the files dir1/dict.txt and dir2/dict.txt can be accessed by
tasks using the symbolic names dict1 and dict2 respectively.
And the archive mytar.tgz will be placed and unarchived into a
- directory by the name tgzdir
+ directory by the name tgzdir.
The distributed cache is also described in greater detail further + down in this tutorial.
The WordCount
application is quite straight-forward.
This section describes the operation of the WordCount
+ application shown earlier in this tutorial.
The Mapper
implementation (lines 14-26), via the
- map
method (lines 18-25), processes one line at a time,
- as provided by the specified TextInputFormat
(line 49).
+
The Mapper
+ implementation (lines 16-30), via the
+ map
method (lines 21-29), processes one line at a time,
+ as provided by the specified TextInputFormat
(line 57).
It then splits the line into tokens separated by whitespaces, via the
StringTokenizer
, and emits a key-value pair of
< <word>, 1>
.
WordCount
also specifies a combiner
(line
- 46). Hence, the output of each map is passed through the local combiner
- (which is same as the Reducer
as per the job
- configuration) for local aggregation, after being sorted on the
- keys.
Reducer
+ as per the job configuration) for local aggregation, after being
+ sorted on the keys.
The output of the first map:
@@ -690,9 +482,12 @@
< Hello, 1>
The Reducer
implementation (lines 28-36), via the
- reduce
method (lines 29-35) just sums up the values,
- which are the occurence counts for each key (i.e. words in this example).
+
The Reducer
+ implementation (lines 32-43), via the
+ reduce
method (lines 34-42) just sums up the values,
+ which are the occurence counts for each key (i.e. words in this
+ example).
@@ -706,12 +501,19 @@
The run
method specifies various facets of the job, such
as the input/output paths (passed via the command line), key/value
- types, input/output formats etc., in the JobConf
.
- It then calls the JobClient.runJob
(line 55) to submit the
- and monitor its progress.
We'll learn more about JobConf
, JobClient
,
- Tool
and other interfaces and classes a bit later in the
+ types, input/output formats etc., in the Job
.
+ It then calls the Job.waitForCompletion()
(line 63)
+ to submit the job to Hadoop and monitor its progress.
We'll learn more about Job
,
+ Mapper
,
+ Tool
+ and other interfaces and classes a bit later in the
tutorial.
This section provides a reasonable amount of detail on every user-facing - aspect of the MapReduce framwork. This should help users implement, - configure and tune their jobs in a fine-grained manner. However, please - note that the javadoc for each class/interface remains the most - comprehensive documentation available; this is only meant to be a tutorial. +
This section provides a reasonable amount of detail on every + user-facing aspect of the MapReduce framwork. This should help users + implement, configure and tune their jobs in a fine-grained manner. + However, please note that the javadoc for each class/interface remains + the most comprehensive documentation available; this is only meant to + be a tutorial.
-Let us first take the Mapper
and Reducer
- interfaces. Applications typically implement them to provide the
+
Let us first take the
+ Mapper
and
+ Reducer
+ classes. Applications typically extend them to provide the
map
and reduce
methods.
We will then discuss other core interfaces including
- JobConf
, JobClient
, Partitioner
,
- OutputCollector
, Reporter
,
- InputFormat
, OutputFormat
,
- OutputCommitter
and others.
We will then discuss other core classes including
+ Job
,
+ Partitioner
,
+ Context
,
+ InputFormat
,
+ OutputFormat
,
+ OutputCommitter
+ and others.
Finally, we will wrap up by discussing some useful features of the
framework such as the Applications typically implement the Applications typically extend the
- Mapper maps input key/value pairs to a set of intermediate
- key/value pairs. Maps are the individual tasks that transform input records into
intermediate records. The transformed intermediate records do not need
@@ -760,29 +576,78 @@
map to zero or many output pairs. The Hadoop MapReduce framework spawns one map task for each
- Overall, DistributedCache
,
@@ -743,16 +558,17 @@
Mapper
and
- Reducer
interfaces to provide the map
and
+ Mapper
and
+ Reducer
classes to provide the map
and
reduce
methods. These form the core of the job.Mapper
+ maps input key/value pairs to a set of
+ intermediate key/value pairs.InputSplit
generated by the InputFormat
for
- the job.Mapper
implementations are passed the
- JobConf
for the job via the
-
- JobConfigurable.configure(JobConf) method and override it to
- initialize themselves. The framework then calls
-
- map(WritableComparable, Writable, OutputCollector, Reporter) for
- each key/value pair in the InputSplit
for that task.
- Applications can then override the
-
- Closeable.close() method to perform any required cleanup.InputSplit
+ generated by the
+ InputFormat
+ for the job. An InputSplit
is a logical representation of
+ a unit of input work for a map task; e.g., a filename and a byte
+ range within that file to process. The InputFormat
is
+ responsible for enumerating the InputSplits
, and
+ producing a
+ RecordReader
+ which will turn those
+ logical work units into actual physical input records.
Overall, Mapper
implementations are specified in the
+ Job
,
+ a client-side class that describes the job's
+ configuration and interfaces with the cluster on behalf of the
+ client program. The Mapper
itself then is instantiated
+ in the running job, and is passed a MapContext
object
+ which it can use to configure itself. The Mapper
+ contains a run()
method which calls its
+ setup()
+ method once, its map()
method for each input record,
+ and finally its cleanup()
method. All of these methods
+ (including run()
itself) can be overridden with
+ your own code. If you do not override any methods (leaving even
+ map as-is), it will act as the identity function, emitting
+ each input record as a separate output.
The Context
object allows the mapper to interact
+ with the rest of the Hadoop system. It includes configuration
+ data for the job, as well as interfaces which allow it to emit
+ output. The getConfiguration()
method returns a
+
+ Configuration
which contains configuration data
+ for your program. You can set arbitrary (key, value) pairs of
+ configuration data in your Job
, e.g. with
+ Job.getConfiguration().set("myKey", "myVal")
,
+ and then retrieve this data in your mapper with
+ Context.getConfiguration().get("myKey")
. This sort of
+ functionality is typically done in the Mapper's
+ setup()
+ method.
The
+ Mapper.run()
+ method then calls
+ map(KeyInType, ValInType, Context)
for
+ each key/value pair in the InputSplit
for that task.
+ Note that in the WordCount program's map() method, we then emit
+ our output data via the Context
argument, using its
+ write()
method.
+
Applications can then override the Mapper's
+ cleanup()
+ method to perform any required teardown operations.
Output pairs do not need to be of the same types as input pairs. A given input pair may map to zero or many output pairs. Output pairs - are collected with calls to - - OutputCollector.collect(WritableComparable,Writable).
+ are collected with calls to +Context.write(KeyOutType, ValOutType)
.
- Applications can use the Reporter
to report
+
Applications can also use the Context
to report
progress, set application-level status messages and update
Counters
, or just indicate that they are alive.
Reducer
(s) to determine the final output. Users can
control the grouping by specifying a Comparator
via
-
- JobConf.setOutputKeyComparatorClass(Class).
+ Job.setGroupingComparatorClass(Class)
.
+ If a grouping comparator is not specified, then all values with the
+ same key will be presented by an unordered Iterable
to
+ a call to the Reducer.reduce()
method.
- The Mapper
outputs are sorted and then
+
The Mapper
outputs are sorted and
partitioned per Reducer
. The total number of partitions is
the same as the number of reduce tasks for the job. Users can control
which keys (and hence records) go to which Reducer
by
- implementing a custom Partitioner
.
Partitioner
.
Users can optionally specify a combiner
, via
-
- JobConf.setCombinerClass(Class), to perform local aggregation of
+ Job.setCombinerClass(Class)
,
+ to perform local aggregation of
the intermediate outputs, which helps to cut down the amount of data
transferred from the Mapper
to the Reducer
.
JobConf
.
+ CompressionCodec to be used via the Job
.
Thus, if you expect 10TB of input data and have a blocksize of
- 128MB
, you'll end up with 82,000 maps, unless
-
- setNumMapTasks(int) (which only provides a hint to the framework)
- is used to set it even higher.
128MB
, you'll end up with 82,000 maps, unless the
+ mapreduce.job.maps
parameter
+ (which only provides a hint to the
+ framework) is used to set it even higher. Ultimately, the number
+ of tasks is controlled by the number of splits returned by the
+ InputFormat.getSplits()
method (which you can
+ override).
+
- Reducer reduces a set of intermediate values which share a key to - a smaller set of values.
- -The number of reduces for the job is set by the user - via - JobConf.setNumReduceTasks(int).
- -Overall, Reducer
implementations are passed the
- JobConf
for the job via the
-
- JobConfigurable.configure(JobConf) method and can override it to
- initialize themselves. The framework then calls
-
- reduce(WritableComparable, Iterator, OutputCollector, Reporter)
- method for each <key, (list of values)>
- pair in the grouped inputs. Applications can then override the
-
- Closeable.close() method to perform any required cleanup.
Reducer
+ reduces a set of intermediate values which
+ share a key to a (usually smaller) set of values.
The number of reduces for the job is set by the user via Job.setNumReduceTasks(int)
.
The API of Reducer
is very similar to that of
+ Mapper
; there's a run()
method that receives
+ a Context
containing the job's configuration as
+ well as interfacing methods that return data from the reducer itself
+ back to the framework. The run()
method calls setup()
once,
+ reduce()
once for each key associated with the
+ reduce task, and cleanup()
+ once at the end. Each of these methods
+ can access the job's configuration data by using
+ Context.getConfiguration()
.
As in Mapper
, any or all of these methods can be
+ overridden with custom implementations. If none of these methods are
+ overridden, the default reducer operation is the identity function;
+ values are passed through without further processing.
The heart of Reducer
is its reduce()
+ method. This is called once per key; the second argument is an
+ Iterable
which returns all the values associated with
+ that key. In the WordCount example, this is all of the 1's or other
+ partial counts associated with a given word. The Reducer should
+ emit its final output (key, value) pairs with the
+ Context.write()
method. It may emit 0, 1, or more
+ (key, value) pairs for each input.
Reducer
has 3 primary phases: shuffle, sort and reduce.
If equivalence rules for grouping the intermediate keys are
required to be different from those for grouping keys before
reduction, then one may specify a Comparator
via
-
- JobConf.setOutputValueGroupingComparator(Class). Since
-
- JobConf.setOutputKeyComparatorClass(Class) can be used to
- control how intermediate keys are grouped, these can be used in
- conjunction to simulate secondary sort on values.
In this phase the
-
- reduce(WritableComparable, Iterator, OutputCollector, Reporter)
- method is called for each <key, (list of values)>
- pair in the grouped inputs.
reduce(MapOutKeyType,
+ Iterable<MapOutValType>, Context)
+ method is called for each <key, (list of
+ values)>
pair in the grouped inputs.
+
The output of the reduce task is typically written to the FileSystem via - - OutputCollector.collect(WritableComparable, Writable).
+Context.write(ReduceOutKeyType, ReduceOutValType)
.
- Applications can use the Reporter
to report
+
Applications can use the Context
to report
progress, set application-level status messages and update
- Counters
, or just indicate that they are alive.
Counters
,
+ or just indicate that they are alive.
The output of the Reducer
is not sorted.
Increasing the number of reduces increases the framework overhead, - but increases load balancing and lowers the cost of failures.
+Increasing the number of reduces increases the framework + overhead, but increases load balancing and lowers the cost of + failures.
The scaling factors above are slightly less than whole numbers to - reserve a few reduce slots in the framework for speculative-tasks and - failed tasks.
+ reserve a few reduce slots in the framework for speculative-tasks + and failed tasks.In this case the outputs of the map-tasks go directly to the
FileSystem
, into the output path set by
-
+
setOutputPath(Path). The framework does not sort the
map-outputs before writing them out to the FileSystem
.
While applications iterate through the values for a given key, it is
- possible to mark the current position and later reset the iterator to
- this position and continue the iteration process. The corresponding
- methods are mark()
and reset()
.
+
While applications iterate through the values for a given key, it
+ is possible to mark the current position and later reset the
+ iterator to this position and continue the iteration process.
+ The corresponding methods are mark()
and
+ reset()
.
mark()
and reset()
can be called any
- number of times during the iteration cycle. The reset()
+ number of times during the iteration cycle. The reset()
method will reset the iterator to the last record before a call to
the previous mark()
.
- values.mark();
+ mitr.mark();
- while (values.hasNext()) {
+ while (mitr.hasNext()) {
- i = values.next();
+ i = mitr.next();
- values.reset();
+ mitr.reset();
- // call to values.next() in this example, we will iterate over all
+ // call to mitr.next() in this example, we will iterate over all
- while (values.hasNext()) {
+ while (mitr.hasNext()) {
- i = values.next();
+ i = mitr.next();
- Partitioner partitions the key space.
+
+ Partitioner
partitions the key space.
Partitioner controls the partitioning of the keys of the
intermediate map-outputs. The key (or a subset of the key) is used to
@@ -1133,103 +1038,111 @@
job. Hence this controls which of the m
reduce tasks the
intermediate key (and hence the record) is sent to for reduction.
- HashPartitioner is the default Partitioner
.
HashPartitioner
is the default
+ Partitioner
.
- Reporter is a facility for MapReduce applications to report
- progress, set application-level status messages and update
- Counters
.
Via the mapper or reducer's Context, MapReduce applications can
+ report progress, set application-level status messages and update
+ Counters
.
Mapper
and Reducer
implementations can use
- the Reporter
to report progress or just indicate
+
Mapper
and Reducer
implementations can
+ use the Context
to report progress or just indicate
that they are alive. In scenarios where the application takes a
significant amount of time to process individual key/value pairs,
this is crucial since the framework might assume that the task has
timed-out and kill that task. Another way to avoid this is to
- set the configuration parameter mapreduce.task.timeout
to a
- high-enough value (or even set it to zero for no time-outs).
+ set the configuration parameter mapreduce.task.timeout
+ to a high-enough value (or even set it to zero for no
+ time-outs).
Applications can also update Counters
using the
- Reporter
.
- OutputCollector is a generalization of the facility provided by
- the MapReduce framework to collect data output by the
- Mapper
or the Reducer
(either the
- intermediate outputs or the output of the job).
Context
.
Hadoop MapReduce comes bundled with a - - library of generally useful mappers, reducers, and partitioners.
+ library of generally useful mappers, reducers, and partitioners + in theorg.apache.hadoop.mapreduce.lib
package.
- JobConf represents a MapReduce job configuration.
+The Job
represents a MapReduce job configuration.
+ The actual state for this object is written to an underlying instance of
+ Configuration.
JobConf
is the primary interface for a user to describe
+
Job
is the primary interface for a user to describe
a MapReduce job to the Hadoop framework for execution. The framework
- tries to faithfully execute the job as described by JobConf
,
+ tries to faithfully execute the job as described by Job
,
however:
setNumReduceTasks(int)
), other parameters interact
+ subtly with the rest of the framework and/or job configuration
+ and are more complex to set (e.g. mapreduce.job.maps
).
JobConf
is typically used to specify the
+
The Job
is typically used to specify the
Mapper
, combiner (if any), Partitioner
,
Reducer
, InputFormat
,
OutputFormat
and OutputCommitter
- implementations. JobConf
also
+ implementations. Job
also
indicates the set of input files
- (setInputPaths(JobConf, Path...)
- /addInputPath(JobConf, Path))
- and (setInputPaths(JobConf, String)
- /addInputPaths(JobConf, String))
+ (setInputPaths(Job, Path...)
+ /addInputPath(Job, Path))
+ and (setInputPaths(Job, String)
+ /addInputPaths(Job, String))
and where the output files should be written
- (setOutputPath(Path)).
Optionally, JobConf
is used to specify other advanced
+
Optionally, Job
is used to specify other advanced
facets of the job such as the Comparator
to be used, files
to be put in the DistributedCache
, whether intermediate
and/or job outputs are to be compressed (and how), debugging via
- user-provided scripts
- (setMapDebugScript(String)/setReduceDebugScript(String))
- , whether job tasks can be executed in a speculative manner
- (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean))
+ user-provided scripts,
+ whether job tasks can be executed in a speculative manner
+ (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean))
, maximum number of attempts per task
- (setMaxMapAttempts(int)/setMaxReduceAttempts(int))
+ (setMaxMapAttempts(int)/setMaxReduceAttempts(int))
, percentage of tasks failure which can be tolerated by the job
- (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int))
- etc.
Of course, users can use - set(String, String)/get(String, String) + (Job.getConfiguration().setInt(Job.MAP_FAILURES_MAX_PERCENT, + int)/Job.getConfiguration().setInt(Job.REDUCE_FAILURES_MAX_PERCENT, + int)), etc.
+ +Of course, users can use Job.getConfiguration()
to get
+ access to the underlying configuration state, and can then use
+ set(String,
+ String)/get(String, String)
to set/get arbitrary parameters needed by applications. However, use the
DistributedCache
for large amounts of (read-only) data.
The child-task inherits the environment of the parent
TaskTracker
. The user can specify additional options to the
child-jvm via the mapred.{map|reduce}.child.java.opts
- configuration parameter in the JobConf
such as non-standard
+ configuration parameter in the job configuration such as non-standard
paths for the run-time linker to search shared libraries via
-Djava.library.path=<>
etc. If the
mapred.{map|reduce}.child.java.opts
parameters contains the
@@ -1295,7 +1208,7 @@
that the value set here is a per process limit.
The value for mapred.{map|reduce}.child.ulimit
should be
specified in kilo bytes (KB). And also the value must be greater than
- or equal to the -Xmx passed to JavaVM, else the VM might not start.
+ or equal to the -Xmx passed to JavaVM, or else the VM might not start.
Note: mapred.{map|reduce}.child.java.opts
are used only
@@ -1366,7 +1279,7 @@
io.sort.buffer.spill.percent
is set to
+ example, if mapreduce.map.sort.spill.percent
is set to
0.33, and the remainder of the buffer is filled while the spill
runs, the next spill will include all the collected records, or
0.66 of the buffer, and will not generate additional spills. In
@@ -1481,9 +1394,7 @@
: The job-specific shared directory. The tasks can use this space as
scratch space and share files among them. This directory is exposed
to the users through the configuration property
- mapreduce.job.local.dir
. The directory can accessed through
- api
- JobConf.getJobLocalDir(). It is available as System property also.
+ mapreduce.job.local.dir
. It is available as System property also.
So, users (streaming etc.) can call
System.getProperty("mapreduce.job.local.dir")
to access the
directory.job.jar
and its contents are
automatically added to the classpath for each task.
The job.jar location is accessible to the application through the api
-
- JobConf.getJar() . To access the unjarred directory,
- JobConf.getJar().getParent() can be called.
+
+ Job.getJar() . To access the unjarred directory,
+ Job.getJar().getParent() can be called.
${mapreduce.cluster.local.dir}/taskTracker/jobcache/$jobid/job.xml
: The job.xml file, the generic job configuration, localized for
the job. Job.getConfiguration().setInt(Job.JVM_NUM_TASKS_TO_RUN, int)
.
- JobClient is the primary interface by which user-job interacts +
The Job
+ is the primary interface by which user-job interacts
with the JobTracker
.
JobClient
provides facilities to submit jobs, track their
+
Job
provides facilities to submit jobs, track their
progress, access component-tasks' reports and logs, get the MapReduce
cluster's status information and so on.
Normally the user creates the application, describes various facets
- of the job via JobConf
, and then uses the
- JobClient
to submit the job and monitor its progress.
Job
, and then uses the
+ waitForCompletion()
method to submit the job and monitor its progress.
Job.waitForCompletion()
:
+ Submits the job and returns only after the
job has completed.
Job.submit()
: Only submits the job;, then poll the
+ other methods of Job
such as isComplete()
,
+ isSuccessful()
, etc.
+ to query status and make scheduling decisions.
Job.getConfiguration().set(Job.END_NOTIFICATION_URL, String)
+ : Sets up a notification upon job-completion, thus avoiding polling.
+
InputFormat describes the input-specification for a MapReduce job.
The default behavior of file-based
+
TextInputFormat is the default If
+
InputSplit represents the data to be processed by an individual
+
FileSplit is the default
+
RecordReader reads
+
OutputFormat describes the output-specification for a MapReduce
job. It is possible to delay creation of output until the first write attempt
- by using
+ by using
LazyOutputFormat. This is particularly useful in preventing the
creation of zero byte files when there is no call to output.collect
(or Context.write). This is achieved by calling the static method
@@ -1813,8 +1721,8 @@
-
+
OutputCommitter describes the commit of task output for a
MapReduce job. To avoid these issues the MapReduce framework, when the
The application-writer can take advantage of this feature by
creating any side-files required in
+
RecordWriter writes the output A job defines the queue it needs to be submitted to through the
- Applications can define arbitrary Applications specify the files to be cached via urls (hdfs://)
- in the User can also specify the profiler configuration arguments by
setting the configuration property
Applications can control compression of intermediate map-outputs
- via the
-
- JobConf.setCompressMapOutput(boolean) api and the
- InputFormat
implementations, typically sub-classes of
-
+
FileInputFormat, is to split the input into logical
InputSplit
instances based on the total size, in bytes, of
the input files. However, the FileSystem
blocksize of the
@@ -1733,7 +1641,7 @@
record-oriented view of the logical InputSplit
to the
individual task.InputFormat
.TextInputFormat
is the InputFormat
for a
@@ -1746,7 +1654,7 @@
Mapper
.RecordReader
to process and present a record-oriented view.InputSplit
. It sets
mapreduce.map.input.file
to the path of the input file for the
logical split.<key, value>
pairs from an
InputSplit
. import org.apache.hadoop.mapred.lib.LazyOutputFormat;
- LazyOutputFormat.setOutputFormatClass(conf, TextOutputFormat.class);
+ import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+ LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileOutputCommitter
is the default
+ FileOutputCommitter
+ is the default
OutputCommitter
. Job setup/cleanup tasks occupy
map or reduce slots, whichever is free on the TaskTracker. And
JobCleanup task, TaskCleanup tasks and JobSetup task have the highest
@@ -1887,20 +1798,22 @@
OutputCommitter
is FileOutputCommitter
,
maintains a special
- ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}
sub-directory
+ ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}
+ sub-directory
accessible via ${mapreduce.task.output.dir}
for each task-attempt on the FileSystem
where the output
of the task-attempt is stored. On successful completion of the
task-attempt, the files in the
- ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}
(only)
- are promoted to ${mapreduce.output.fileoutputformat.outputdir}
. Of course,
+ ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}
+ (only) are promoted to
+ ${mapreduce.output.fileoutputformat.outputdir}
. Of course,
the framework discards the sub-directory of unsuccessful task-attempts.
This process is completely transparent to the application.${mapreduce.task.output.dir}
during execution of a task via
-
+
FileOutputFormat.getWorkOutputPath(), and the framework will promote them
similarly for succesful task-attempts, thus eliminating the need to
pick unique paths per task-attempt.${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}
, and this value is
set by the MapReduce framework. So, just create any side-files in the
path returned by
-
+
FileOutputFormat.getWorkOutputPath() from MapReduce
task to take advantage of this feature.<key, value>
pairs to an output file.mapreduce.job.queuename
property, or through the
- setQueueName(String)
- API. Setting the queue name is optional. If a job is submitted
+ mapreduce.job.queuename
property.
+ Setting the queue name is optional. If a job is submitted
without an associated queue name, it is submitted to the 'default'
queue.Counters
represent global counters, defined either by
- the MapReduce framework or applications. Each Counter
can
+ Counters
represent global counters, defined either by
+ the MapReduce framework or applications. Each Counter
can
be of any Enum
type. Counters of a particular
Enum
are bunched into groups of type
Counters.Group
.Counters
(of type
- Enum
) and update them via
-
- Reporter.incrCounter(Enum, long) or
-
- Reporter.incrCounter(String, String, long)
- in the map
and/or
- reduce
methods. These counters are then globally
- aggregated by the framework.Enum
); get a Counter
object from the task's
+ Context with the getCounter()
method, and then call
+ the Counter.increment(long)
method to increment its
+ value locally. These counters are then globally aggregated by the framework.JobConf
. The DistributedCache
+ in the Job
. The DistributedCache
assumes that the files specified via hdfs:// urls are already present
on the FileSystem
.
public static class MapClass extends Mapper<K, V, K, V> {
- private Path[] localArchives;
- private Path[] localFiles;
- public void setup(Context context) {
- // Get the cached archives/files
- localArchives = context.getLocalCacheArchives();
- localFiles = context.getLocalCacheFiles();
- }
+ private Path[] localArchives;
+ private Path[] localFiles;
+ public void setup(Context context) {
+ // Get the cached archives/files
+ localArchives = context.getLocalCacheArchives();
+ localFiles = context.getLocalCacheFiles();
+ }
- public void map(K key, V value,
+
public void map(K key, V value,
Context context) throws IOException {
- // Use data from the cached archives/files here
- // ...
- // ...
- context.write(k, v);
- }
+ // Use data from the cached archives/files here
+ // ...
+ // ...
+ context.write(k, v);
+ }
}
mapreduce.task.profile
. The
value can be set using the api
-
- JobConf.setProfileEnabled(boolean). If the value is set
+
+ Job.setProfileEnabled(boolean). If the value is set
true
, the task profiling is enabled. The profiler
information is stored in the user log directory. By default,
profiling is not enabled for the job. mapreduce.task.profile.{maps|reduces}
to set the ranges
of MapReduce tasks to profile. The value can be set using the api
-
- JobConf.setProfileTaskRange(boolean,String).
+
+ Job.setProfileTaskRange(boolean,String).
By default, the specified range is 0-2
.mapreduce.task.profile.params
. The value can be specified
using the api
-
- JobConf.setProfileParams(String). If the string contains a
+
+ Job.setProfileParams(String). If the string contains a
%s
, it will be replaced with the name of the profiling
output file when the task runs. These parameters are passed to the
task child JVM on the command line. The default value for
@@ -2224,10 +2139,9 @@
properties mapreduce.map.debug.script
and
mapreduce.reduce.debug.script
, for debugging map and
reduce tasks respectively. These properties can also be set by using APIs
-
- JobConf.setMapDebugScript(String) and
-
- JobConf.setReduceDebugScript(String) . In streaming mode, a debug
+ Job.getConfiguration().set(Job.MAP_DEBUG_SCRIPT, String)
+ and Job.getConfiguration().set(Job.REDUCE_DEBUG_SCRIPT,
+ String)
. In streaming mode, a debug
script can be submitted with the command-line options
-mapdebug
and -reducedebug
, for debugging
map and reduce tasks respectively.CompressionCodec
to be used via the
-
- JobConf.setMapOutputCompressorClass(Class) api.Job.getConfiguration().setBoolean(Job.MAP_OUTPUT_COMPRESS, bool)
+ api and the CompressionCodec
to be used via the
+ Job.getConfiguration().setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, Class,
+ CompressionCodec.class)
api.
Applications can control compression of job-outputs via the
-
- FileOutputFormat.setCompressOutput(JobConf, boolean) api and the
+
+ FileOutputFormat.setCompressOutput(Job, boolean) api and the
CompressionCodec
to be used can be specified via the
-
- FileOutputFormat.setOutputCompressorClass(JobConf, Class) api.
If the job outputs are to be stored in the
-
+
SequenceFileOutputFormat, the required
SequenceFile.CompressionType
(i.e. RECORD
/
BLOCK
- defaults to RECORD
) can be
specified via the
-
- SequenceFileOutputFormat.setOutputCompressionType(JobConf,
+
+ SequenceFileOutputFormat.setOutputCompressionType(Job,
SequenceFile.CompressionType) api.
Skipped records are written to HDFS in the sequence file format, for later analysis. The location can be changed through [... 1242 lines stripped ...]