Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 58376 invoked from network); 25 Apr 2010 02:34:11 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 25 Apr 2010 02:34:11 -0000 Received: (qmail 10772 invoked by uid 500); 25 Apr 2010 02:34:11 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 10748 invoked by uid 500); 25 Apr 2010 02:34:11 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 10739 invoked by uid 99); 25 Apr 2010 02:34:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 25 Apr 2010 02:34:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 25 Apr 2010 02:34:02 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 33BED23889C5; Sun, 25 Apr 2010 02:33:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r937736 [2/3] - in /hadoop/mapreduce/trunk: CHANGES.txt src/docs/src/documentation/content/xdocs/mapred_tutorial.xml src/docs/src/documentation/content/xdocs/site.xml Date: Sun, 25 Apr 2010 02:33:18 -0000 To: mapreduce-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100425023318.33BED23889C5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=937736&r1=937735&r2=937736&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original) +++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Sun Apr 25 02:33:17 2010 @@ -67,7 +67,8 @@

Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the - Hadoop Distributed File System (HDFS) + Hadoop + Distributed File System (HDFS) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the @@ -117,11 +118,14 @@ the job, conceivably of different types.

The key and value classes have to be - serializable by the framework and hence need to implement the - Writable - interface. Additionally, the key classes have to implement the + serializable by the framework. Several serialization systems exists; the + default serialization mechanism requires keys and values to implement + the + Writable interface. + Additionally, the key classes must facilitate sorting by the + framework; a straightforward means to do so is for them to implement the - WritableComparable interface to facilitate sorting by the framework. + WritableComparable interface.

Input and Output types of a MapReduce job:

@@ -132,7 +136,7 @@ -> <k2, v2> -> - combine + combine* -> <k2, v2> -> @@ -140,6 +144,8 @@ -> <k3, v3> (output)

+

Note that the combine phase may run zero or more times in this + process.

@@ -164,383 +170,150 @@ WordCount.java - - 1. - - package org.myorg; - - - - 2. - - - - 3. - - import java.io.IOException; - - - - 4. - - import java.util.*; - - - - 5. - - - - 6. - - import org.apache.hadoop.fs.Path; - - - - 7. - - import org.apache.hadoop.conf.*; - - - - 8. - - import org.apache.hadoop.io.*; - - - - 9. - - import org.apache.hadoop.mapred.*; - - - - 10. - - import org.apache.hadoop.util.*; - - - - 11. - - - - 12. - - public class WordCount { - - - - 13. - - - - 14. - -    - - public static class Map extends MapReduceBase - implements Mapper<LongWritable, Text, Text, IntWritable> { - - - - - 15. - -      - - private final static IntWritable one = new IntWritable(1); - - - - - 16. - -      - private Text word = new Text(); - - - - 17. - - - - 18. - -      - - public void map(LongWritable key, Text value, - OutputCollector<Text, IntWritable> output, - Reporter reporter) throws IOException { - - - - - 19. - -        - String line = value.toString(); - - - - 20. - -        - StringTokenizer tokenizer = new StringTokenizer(line); - - - - 21. - -        - while (tokenizer.hasMoreTokens()) { - - - - 22. - -          - word.set(tokenizer.nextToken()); - - - - 23. - -          - output.collect(word, one); - - - - 24. - -        - } - - - - 25. - -      - } - - - - 26. - -    - } - - - - 27. - - - - 28. - -    - - public static class Reduce extends MapReduceBase implements - Reducer<Text, IntWritable, Text, IntWritable> { - - - - - 29. - -      - - public void reduce(Text key, Iterator<IntWritable> values, - OutputCollector<Text, IntWritable> output, - Reporter reporter) throws IOException { - - - - - 30. - -        - int sum = 0; - - - - 31. - -        - while (values.hasNext()) { - - - - 32. - -          - sum += values.next().get(); - - - - 33. - -        - } - - - - 34. - -        - output.collect(key, new IntWritable(sum)); - - - - 35. - -      - } - - - - 36. - -    - } - - - - 37. - - - - 38. - -    - - public static void main(String[] args) throws Exception { - - - - - 39. - -      - - JobConf conf = new JobConf(WordCount.class); - - - - - 40. - -      - conf.setJobName("wordcount"); - - - - 41. - - - - 42. - -      - conf.setOutputKeyClass(Text.class); - - - - 43. - -      - conf.setOutputValueClass(IntWritable.class); - - - - 44. - - - - 45. - -      - conf.setMapperClass(Map.class); - - - - 46. - -      - conf.setCombinerClass(Reduce.class); - - - - 47. - -      - conf.setReducerClass(Reduce.class); - - - - 48. - - - - 49. - -      - conf.setInputFormat(TextInputFormat.class); - - - - 50. - -      - conf.setOutputFormat(TextOutputFormat.class); - - - - 51. - - - - 52. - -      - FileInputFormat.setInputPaths(conf, new Path(args[0])); - - - - 53. - -      - FileOutputFormat.setOutputPath(conf, new Path(args[1])); - - - - 54. - - - - 55. - -      - JobClient.runJob(conf); - - - - 57. - -    - } - - - - 58. - - } - - - - 59. - - +1.package org.myorg; + +2. + +3.import java.io.IOException; + +4.import java.util.*; + +5. + +6.import org.apache.hadoop.fs.Path; + +7.import org.apache.hadoop.conf.*; + +8.import org.apache.hadoop.io.*; + +9.import org.apache.hadoop.mapreduce.*; + +10.import org.apache.hadoop.mapreduce.lib.input.*; + +11.import org.apache.hadoop.mapreduce.lib.output.*; + +12.import org.apache.hadoop.util.*; + +13. + +14.public class WordCount extends Configured implements Tool { + +15. + +16.   public static class Map + +17.       extends Mapper<LongWritable, Text, Text, IntWritable> { + +18.     private final static IntWritable one = new IntWritable(1); + +19.     private Text word = new Text(); + +20. + +21.     public void map(LongWritable key, Text value, Context context) + +22.         throws IOException, InterruptedException { + +23.       String line = value.toString(); + +24.       StringTokenizer tokenizer = new StringTokenizer(line); + +25.       while (tokenizer.hasMoreTokens()) { + +26.         word.set(tokenizer.nextToken()); + +27.         context.write(word, one); + +28.       } + +29.     } + +30.   } + +31. + +32.   public static class Reduce + +33.       extends Reducer<Text, IntWritable, Text, IntWritable> { + +34.     public void reduce(Text key, Iterable<IntWritable> values, + +35.         Context context) throws IOException, InterruptedException { + +36. + +37.       int sum = 0; + +38.       for (IntWritable val : values) { + +39.         sum += val.get(); + +40.       } + +41.       context.write(key, new IntWritable(sum)); + +42.     } + +43.   } + +44. + +45.   public int run(String [] args) throws Exception { + +46.     Job job = new Job(getConf()); + +47.     job.setJarByClass(WordCount.class); + +48.     job.setJobName("wordcount"); + +49. + +50.     job.setOutputKeyClass(Text.class); + +51.     job.setOutputValueClass(IntWritable.class); + +52. + +53.     job.setMapperClass(Map.class); + +54.     job.setCombinerClass(Reduce.class); + +55.     job.setReducerClass(Reduce.class); + +56. + +57.     job.setInputFormatClass(TextInputFormat.class); + +58.     job.setOutputFormatClass(TextOutputFormat.class); + +59. + +60.     FileInputFormat.setInputPaths(job, new Path(args[0])); + +61.     FileOutputFormat.setOutputPath(job, new Path(args[1])); + +62. + +63.     boolean success = job.waitForCompletion(true); + +64.     return success ? 0 : 1; + +65.   } + +66. + +67.   public static void main(String[] args) throws Exception { + +68.     int ret = ToolRunner.run(new WordCount(), args); + +69.     System.exit(ret); + +70.   } + +71.} + +72. +
@@ -553,47 +326,48 @@

$ mkdir wordcount_classes
- $ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar + $ javac -classpath + ${HADOOP_HOME}/hadoop-core-${HADOOP_VERSION}.jar:${HADOOP_HOME}/hadoop-mapred-${HADOOP_VERSION}.jar:${HADOOP_HOME}/hadoop-hdfs-${HADOOP_VERSION}.jar -d wordcount_classes WordCount.java
- $ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ . + $ jar -cvf /user/joe/wordcount.jar -C wordcount_classes/ .

Assuming that:

  • - /usr/joe/wordcount/input - input directory in HDFS + /user/joe/wordcount/input - input directory in HDFS
  • - /usr/joe/wordcount/output - output directory in HDFS + /user/joe/wordcount/output - output directory in HDFS

Sample text-files as input:

- $ bin/hadoop dfs -ls /usr/joe/wordcount/input/
- /usr/joe/wordcount/input/file01
- /usr/joe/wordcount/input/file02
+ $ bin/hadoop fs -ls /user/joe/wordcount/input/
+ /user/joe/wordcount/input/file01
+ /user/joe/wordcount/input/file02

- $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
+ $ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World

- $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
+ $ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop

Run the application:

- $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount - /usr/joe/wordcount/input /usr/joe/wordcount/output + $ bin/hadoop jar /user/joe/wordcount.jar org.myorg.WordCount + /user/joe/wordcount/input /user/joe/wordcount/output

Output:

- $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 + $ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
@@ -602,19 +376,27 @@ Hello 2
World 2

+ + +
+ Bundling a data payload with your application -

Applications can specify a comma separated list of paths which +

Applications can specify a comma-separated list of paths which would be present in the current working directory of the task using the option -files. The -libjars option allows applications to add jars to the classpaths of the maps and reduces. The option -archives allows them to pass comma separated list of archives as arguments. These archives are unarchived and a link with name of the archive is created in - the current working directory of tasks. More - details about the command line options are available at + the current working directory of tasks. The mechanism that + provides this functionality is called the distributed cache. + More details about the command line options surrounding job launching + and control of the distributed cache are available at Hadoop Commands Guide.

-

Running wordcount example with +

Hadoop ships with some example code in a jar precompiled for you; + one of these is (another) wordcount program. Here's an example + invocation of the wordcount example with -libjars, -files and -archives:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt @@ -634,18 +416,26 @@ Here, the files dir1/dict.txt and dir2/dict.txt can be accessed by tasks using the symbolic names dict1 and dict2 respectively. And the archive mytar.tgz will be placed and unarchived into a - directory by the name tgzdir + directory by the name tgzdir.

+ +

The distributed cache is also described in greater detail further + down in this tutorial.

Walk-through -

The WordCount application is quite straight-forward.

+

This section describes the operation of the WordCount + application shown earlier in this tutorial.

-

The Mapper implementation (lines 14-26), via the - map method (lines 18-25), processes one line at a time, - as provided by the specified TextInputFormat (line 49). +

The Mapper + implementation (lines 16-30), via the + map method (lines 21-29), processes one line at a time, + as provided by the specified TextInputFormat (line 57). It then splits the line into tokens separated by whitespaces, via the StringTokenizer, and emits a key-value pair of < <word>, 1>.

@@ -671,10 +461,12 @@ tutorial.

WordCount also specifies a combiner (line - 46). Hence, the output of each map is passed through the local combiner - (which is same as the Reducer as per the job - configuration) for local aggregation, after being sorted on the - keys.

+ 54). Hence, the output of each map is passed through the local combiner + (which is same as the Reducer + as per the job configuration) for local aggregation, after being + sorted on the keys.

The output of the first map:
@@ -690,9 +482,12 @@ < Hello, 1>

-

The Reducer implementation (lines 28-36), via the - reduce method (lines 29-35) just sums up the values, - which are the occurence counts for each key (i.e. words in this example). +

The Reducer + implementation (lines 32-43), via the + reduce method (lines 34-42) just sums up the values, + which are the occurence counts for each key (i.e. words in this + example).

@@ -706,12 +501,19 @@

The run method specifies various facets of the job, such as the input/output paths (passed via the command line), key/value - types, input/output formats etc., in the JobConf. - It then calls the JobClient.runJob (line 55) to submit the - and monitor its progress.

- -

We'll learn more about JobConf, JobClient, - Tool and other interfaces and classes a bit later in the + types, input/output formats etc., in the Job. + It then calls the Job.waitForCompletion() (line 63) + to submit the job to Hadoop and monitor its progress.

+ +

We'll learn more about Job, + Mapper, + Tool + and other interfaces and classes a bit later in the tutorial.

@@ -719,22 +521,35 @@
MapReduce - User Interfaces -

This section provides a reasonable amount of detail on every user-facing - aspect of the MapReduce framwork. This should help users implement, - configure and tune their jobs in a fine-grained manner. However, please - note that the javadoc for each class/interface remains the most - comprehensive documentation available; this is only meant to be a tutorial. +

This section provides a reasonable amount of detail on every + user-facing aspect of the MapReduce framwork. This should help users + implement, configure and tune their jobs in a fine-grained manner. + However, please note that the javadoc for each class/interface remains + the most comprehensive documentation available; this is only meant to + be a tutorial.

-

Let us first take the Mapper and Reducer - interfaces. Applications typically implement them to provide the +

Let us first take the + Mapper and + Reducer + classes. Applications typically extend them to provide the map and reduce methods.

-

We will then discuss other core interfaces including - JobConf, JobClient, Partitioner, - OutputCollector, Reporter, - InputFormat, OutputFormat, - OutputCommitter and others.

+

We will then discuss other core classes including + Job, + Partitioner, + Context, + InputFormat, + OutputFormat, + OutputCommitter + and others.

Finally, we will wrap up by discussing some useful features of the framework such as the DistributedCache, @@ -743,16 +558,17 @@

Payload -

Applications typically implement the Mapper and - Reducer interfaces to provide the map and +

Applications typically extend the Mapper and + Reducer classes to provide the map and reduce methods. These form the core of the job.

Mapper -

- Mapper maps input key/value pairs to a set of intermediate - key/value pairs.

+

Mapper + maps input key/value pairs to a set of + intermediate key/value pairs.

Maps are the individual tasks that transform input records into intermediate records. The transformed intermediate records do not need @@ -760,29 +576,78 @@ map to zero or many output pairs.

The Hadoop MapReduce framework spawns one map task for each - InputSplit generated by the InputFormat for - the job.

- -

Overall, Mapper implementations are passed the - JobConf for the job via the - - JobConfigurable.configure(JobConf) method and override it to - initialize themselves. The framework then calls - - map(WritableComparable, Writable, OutputCollector, Reporter) for - each key/value pair in the InputSplit for that task. - Applications can then override the - - Closeable.close() method to perform any required cleanup.

- + InputSplit + generated by the + InputFormat + for the job. An InputSplit is a logical representation of + a unit of input work for a map task; e.g., a filename and a byte + range within that file to process. The InputFormat is + responsible for enumerating the InputSplits, and + producing a + RecordReader + which will turn those + logical work units into actual physical input records.

+ +

Overall, Mapper implementations are specified in the + Job, + a client-side class that describes the job's + configuration and interfaces with the cluster on behalf of the + client program. The Mapper itself then is instantiated + in the running job, and is passed a MapContext object + which it can use to configure itself. The Mapper + contains a run() method which calls its + setup() + method once, its map() method for each input record, + and finally its cleanup() method. All of these methods + (including run() itself) can be overridden with + your own code. If you do not override any methods (leaving even + map as-is), it will act as the identity function, emitting + each input record as a separate output.

+ +

The Context object allows the mapper to interact + with the rest of the Hadoop system. It includes configuration + data for the job, as well as interfaces which allow it to emit + output. The getConfiguration() method returns a + + Configuration which contains configuration data + for your program. You can set arbitrary (key, value) pairs of + configuration data in your Job, e.g. with + Job.getConfiguration().set("myKey", "myVal"), + and then retrieve this data in your mapper with + Context.getConfiguration().get("myKey"). This sort of + functionality is typically done in the Mapper's + setup() + method.

+ +

The + Mapper.run() + method then calls + map(KeyInType, ValInType, Context) for + each key/value pair in the InputSplit for that task. + Note that in the WordCount program's map() method, we then emit + our output data via the Context argument, using its + write() method. +

+ +

Applications can then override the Mapper's + cleanup() + method to perform any required teardown operations.

Output pairs do not need to be of the same types as input pairs. A given input pair may map to zero or many output pairs. Output pairs - are collected with calls to - - OutputCollector.collect(WritableComparable,Writable).

+ are collected with calls to + Context.write(KeyOutType, ValOutType).

-

Applications can use the Reporter to report +

Applications can also use the Context to report progress, set application-level status messages and update Counters, or just indicate that they are alive.

@@ -790,18 +655,26 @@ subsequently grouped by the framework, and passed to the Reducer(s) to determine the final output. Users can control the grouping by specifying a Comparator via - - JobConf.setOutputKeyComparatorClass(Class).

+ Job.setGroupingComparatorClass(Class). + If a grouping comparator is not specified, then all values with the + same key will be presented by an unordered Iterable to + a call to the Reducer.reduce() method.

-

The Mapper outputs are sorted and then +

The Mapper outputs are sorted and partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by - implementing a custom Partitioner.

+ implementing a custom + Partitioner.

Users can optionally specify a combiner, via - - JobConf.setCombinerClass(Class), to perform local aggregation of + Job.setCombinerClass(Class), + to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.

@@ -811,7 +684,7 @@ Applications can control if, and how, the intermediate outputs are to be compressed and the - CompressionCodec to be used via the JobConf. + CompressionCodec to be used via the Job.

@@ -826,35 +699,63 @@ maps take at least a minute to execute.

Thus, if you expect 10TB of input data and have a blocksize of - 128MB, you'll end up with 82,000 maps, unless - - setNumMapTasks(int) (which only provides a hint to the framework) - is used to set it even higher.

+ 128MB, you'll end up with 82,000 maps, unless the + mapreduce.job.maps parameter + (which only provides a hint to the + framework) is used to set it even higher. Ultimately, the number + of tasks is controlled by the number of splits returned by the + InputFormat.getSplits() method (which you can + override). +

Reducer -

- Reducer reduces a set of intermediate values which share a key to - a smaller set of values.

- -

The number of reduces for the job is set by the user - via - JobConf.setNumReduceTasks(int).

- -

Overall, Reducer implementations are passed the - JobConf for the job via the - - JobConfigurable.configure(JobConf) method and can override it to - initialize themselves. The framework then calls - - reduce(WritableComparable, Iterator, OutputCollector, Reporter) - method for each <key, (list of values)> - pair in the grouped inputs. Applications can then override the - - Closeable.close() method to perform any required cleanup.

+

Reducer + reduces a set of intermediate values which + share a key to a (usually smaller) set of values.

+ +

The number of reduces for the job is set by the user via Job.setNumReduceTasks(int).

+ +

The API of Reducer is very similar to that of + Mapper; there's a run() method that receives + a Context containing the job's configuration as + well as interfacing methods that return data from the reducer itself + back to the framework. The run() method calls setup() once, + reduce() once for each key associated with the + reduce task, and cleanup() + once at the end. Each of these methods + can access the job's configuration data by using + Context.getConfiguration().

+ +

As in Mapper, any or all of these methods can be + overridden with custom implementations. If none of these methods are + overridden, the default reducer operation is the identity function; + values are passed through without further processing.

+ +

The heart of Reducer is its reduce() + method. This is called once per key; the second argument is an + Iterable which returns all the values associated with + that key. In the WordCount example, this is all of the 1's or other + partial counts associated with a given word. The Reducer should + emit its final output (key, value) pairs with the + Context.write() method. It may emit 0, 1, or more + (key, value) pairs for each input.

Reducer has 3 primary phases: shuffle, sort and reduce.

@@ -882,12 +783,12 @@

If equivalence rules for grouping the intermediate keys are required to be different from those for grouping keys before reduction, then one may specify a Comparator via - - JobConf.setOutputValueGroupingComparator(Class). Since - - JobConf.setOutputKeyComparatorClass(Class) can be used to - control how intermediate keys are grouped, these can be used in - conjunction to simulate secondary sort on values.

+ Job.setGroupingComparatorClass(Class). Since this + can be used to control how intermediate keys are grouped, these + can be used in conjunction to simulate secondary sort on + values.

@@ -895,20 +796,22 @@ Reduce

In this phase the - - reduce(WritableComparable, Iterator, OutputCollector, Reporter) - method is called for each <key, (list of values)> - pair in the grouped inputs.

- + reduce(MapOutKeyType, + Iterable<MapOutValType>, Context) + method is called for each <key, (list of + values)> pair in the grouped inputs.

+

The output of the reduce task is typically written to the FileSystem via - - OutputCollector.collect(WritableComparable, Writable).

+ Context.write(ReduceOutKeyType, ReduceOutValType).

-

Applications can use the Reporter to report +

Applications can use the Context to report progress, set application-level status messages and update - Counters, or just indicate that they are alive.

+ Counters, + or just indicate that they are alive.

The output of the Reducer is not sorted.

@@ -926,12 +829,13 @@ reduces and launch a second wave of reduces doing a much better job of load balancing.

-

Increasing the number of reduces increases the framework overhead, - but increases load balancing and lowers the cost of failures.

+

Increasing the number of reduces increases the framework + overhead, but increases load balancing and lowers the cost of + failures.

The scaling factors above are slightly less than whole numbers to - reserve a few reduce slots in the framework for speculative-tasks and - failed tasks.

+ reserve a few reduce slots in the framework for speculative-tasks + and failed tasks.

@@ -942,7 +846,7 @@

In this case the outputs of the map-tasks go directly to the FileSystem, into the output path set by - + setOutputPath(Path). The framework does not sort the map-outputs before writing them out to the FileSystem.

@@ -951,14 +855,15 @@
Mark-Reset -

While applications iterate through the values for a given key, it is - possible to mark the current position and later reset the iterator to - this position and continue the iteration process. The corresponding - methods are mark() and reset(). +

While applications iterate through the values for a given key, it + is possible to mark the current position and later reset the + iterator to this position and continue the iteration process. + The corresponding methods are mark() and + reset().

mark() and reset() can be called any - number of times during the iteration cycle. The reset() + number of times during the iteration cycle. The reset() method will reset the iterator to the last record before a call to the previous mark().

@@ -1005,7 +910,7 @@    - values.mark(); + mitr.mark(); @@ -1014,14 +919,14 @@    - while (values.hasNext()) { + while (mitr.hasNext()) {      - i = values.next(); + i = mitr.next(); @@ -1051,7 +956,7 @@    - values.reset(); + mitr.reset(); @@ -1067,7 +972,7 @@    - // call to values.next() in this example, we will iterate over all + // call to mitr.next() in this example, we will iterate over all @@ -1081,14 +986,14 @@    - while (values.hasNext()) { + while (mitr.hasNext()) {      - i = values.next(); + i = mitr.next(); @@ -1123,8 +1028,8 @@
Partitioner -

- Partitioner partitions the key space.

+

+ Partitioner partitions the key space.

Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to @@ -1133,103 +1038,111 @@ job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.

-

- HashPartitioner is the default Partitioner.

+

HashPartitioner is the default + Partitioner.

- Reporter + Reporting Progress -

- Reporter is a facility for MapReduce applications to report - progress, set application-level status messages and update - Counters.

+

Via the mapper or reducer's Context, MapReduce applications can + report progress, set application-level status messages and update + Counters.

-

Mapper and Reducer implementations can use - the Reporter to report progress or just indicate +

Mapper and Reducer implementations can + use the Context to report progress or just indicate that they are alive. In scenarios where the application takes a significant amount of time to process individual key/value pairs, this is crucial since the framework might assume that the task has timed-out and kill that task. Another way to avoid this is to - set the configuration parameter mapreduce.task.timeout to a - high-enough value (or even set it to zero for no time-outs). + set the configuration parameter mapreduce.task.timeout + to a high-enough value (or even set it to zero for no + time-outs).

Applications can also update Counters using the - Reporter.

-
- -
- OutputCollector - -

- OutputCollector is a generalization of the facility provided by - the MapReduce framework to collect data output by the - Mapper or the Reducer (either the - intermediate outputs or the output of the job).

+ Context.

Hadoop MapReduce comes bundled with a - - library of generally useful mappers, reducers, and partitioners.

+ library of generally useful mappers, reducers, and partitioners + in the org.apache.hadoop.mapreduce.lib package.

Job Configuration -

- JobConf represents a MapReduce job configuration.

+

The Job represents a MapReduce job configuration. + The actual state for this object is written to an underlying instance of + Configuration.

-

JobConf is the primary interface for a user to describe +

Job is the primary interface for a user to describe a MapReduce job to the Hadoop framework for execution. The framework - tries to faithfully execute the job as described by JobConf, + tries to faithfully execute the job as described by Job, however:

    -
  • f +
  • Some configuration parameters may have been marked as final by administrators and hence cannot be altered.
  • While some job parameters are straight-forward to set (e.g. - - setNumReduceTasks(int)), other parameters interact subtly with - the rest of the framework and/or job configuration and are - more complex to set (e.g. - - setNumMapTasks(int)). + setNumReduceTasks(int)), other parameters interact + subtly with the rest of the framework and/or job configuration + and are more complex to set (e.g. mapreduce.job.maps).
-

JobConf is typically used to specify the +

The Job is typically used to specify the Mapper, combiner (if any), Partitioner, Reducer, InputFormat, OutputFormat and OutputCommitter - implementations. JobConf also + implementations. Job also indicates the set of input files - (setInputPaths(JobConf, Path...) - /addInputPath(JobConf, Path)) - and (setInputPaths(JobConf, String) - /addInputPaths(JobConf, String)) + (setInputPaths(Job, Path...) + /addInputPath(Job, Path)) + and (setInputPaths(Job, String) + /addInputPaths(Job, String)) and where the output files should be written - (setOutputPath(Path)).

+ (setOutputPath(Path)).

-

Optionally, JobConf is used to specify other advanced +

Optionally, Job is used to specify other advanced facets of the job such as the Comparator to be used, files to be put in the DistributedCache, whether intermediate and/or job outputs are to be compressed (and how), debugging via - user-provided scripts - (setMapDebugScript(String)/setReduceDebugScript(String)) - , whether job tasks can be executed in a speculative manner - (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) + user-provided scripts, + whether job tasks can be executed in a speculative manner + (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) , maximum number of attempts per task - (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) + (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) , percentage of tasks failure which can be tolerated by the job - (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) - etc.

- -

Of course, users can use - set(String, String)/get(String, String) + (Job.getConfiguration().setInt(Job.MAP_FAILURES_MAX_PERCENT, + int)/Job.getConfiguration().setInt(Job.REDUCE_FAILURES_MAX_PERCENT, + int)), etc.

+ +

Of course, users can use Job.getConfiguration() to get + access to the underlying configuration state, and can then use + set(String, + String)/get(String, String) to set/get arbitrary parameters needed by applications. However, use the DistributedCache for large amounts of (read-only) data.

@@ -1244,7 +1157,7 @@

The child-task inherits the environment of the parent TaskTracker. The user can specify additional options to the child-jvm via the mapred.{map|reduce}.child.java.opts - configuration parameter in the JobConf such as non-standard + configuration parameter in the job configuration such as non-standard paths for the run-time linker to search shared libraries via -Djava.library.path=<> etc. If the mapred.{map|reduce}.child.java.opts parameters contains the @@ -1295,7 +1208,7 @@ that the value set here is a per process limit. The value for mapred.{map|reduce}.child.ulimit should be specified in kilo bytes (KB). And also the value must be greater than - or equal to the -Xmx passed to JavaVM, else the VM might not start. + or equal to the -Xmx passed to JavaVM, or else the VM might not start.

Note: mapred.{map|reduce}.child.java.opts are used only @@ -1366,7 +1279,7 @@

  • If the spill threshold is exceeded while a spill is in progress, collection will continue until the spill is finished. For - example, if io.sort.buffer.spill.percent is set to + example, if mapreduce.map.sort.spill.percent is set to 0.33, and the remainder of the buffer is filled while the spill runs, the next spill will include all the collected records, or 0.66 of the buffer, and will not generate additional spills. In @@ -1481,9 +1394,7 @@ : The job-specific shared directory. The tasks can use this space as scratch space and share files among them. This directory is exposed to the users through the configuration property - mapreduce.job.local.dir. The directory can accessed through - api - JobConf.getJobLocalDir(). It is available as System property also. + mapreduce.job.local.dir. It is available as System property also. So, users (streaming etc.) can call System.getProperty("mapreduce.job.local.dir") to access the directory.
  • @@ -1495,9 +1406,9 @@ This directory is extracted from job.jar and its contents are automatically added to the classpath for each task. The job.jar location is accessible to the application through the api - - JobConf.getJar() . To access the unjarred directory, - JobConf.getJar().getParent() can be called. + + Job.getJar() . To access the unjarred directory, + Job.getJar().getParent() can be called.
  • ${mapreduce.cluster.local.dir}/taskTracker/jobcache/$jobid/job.xml : The job.xml file, the generic job configuration, localized for the job.
  • @@ -1546,8 +1457,7 @@ (i.e. 1 task per JVM). If it is -1, there is no limit to the number of tasks a JVM can run (of the same job). One can also specify some value greater than 1 using the api - - JobConf.setNumTasksToExecutePerJvm(int)

    + Job.getConfiguration().setInt(Job.JVM_NUM_TASKS_TO_RUN, int).

@@ -1616,11 +1526,11 @@
Job Submission and Monitoring -

- JobClient is the primary interface by which user-job interacts +

The Job + is the primary interface by which user-job interacts with the JobTracker.

-

JobClient provides facilities to submit jobs, track their +

Job provides facilities to submit jobs, track their progress, access component-tasks' reports and logs, get the MapReduce cluster's status information and so on.

@@ -1657,8 +1567,8 @@ to filter log files from the output directory listing.

Normally the user creates the application, describes various facets - of the job via JobConf, and then uses the - JobClient to submit the job and monitor its progress.

+ of the job via Job, and then uses the + waitForCompletion() method to submit the job and monitor its progress.

Job Control @@ -1673,22 +1583,20 @@ complete (success/failure) lies squarely on the clients. In such cases, the various job-control options are:

    -
  • - - runJob(JobConf) : Submits the job and returns only after the +
  • Job.waitForCompletion() : + Submits the job and returns only after the job has completed.
  • - - submitJob(JobConf) : Only submits the job, then poll the - returned handle to the - - RunningJob to query status and make scheduling decisions. + Job.submit() : Only submits the job;, then poll the + other methods of Job such as isComplete(), + isSuccessful(), etc. + to query status and make scheduling decisions.
  • - - JobConf.setJobEndNotificationURI(String) : Sets up a - notification upon job-completion, thus avoiding polling. + Job.getConfiguration().set(Job.END_NOTIFICATION_URL, String) + : Sets up a notification upon job-completion, thus avoiding polling.
@@ -1697,7 +1605,7 @@
Job Input -

+

InputFormat describes the input-specification for a MapReduce job.

@@ -1719,7 +1627,7 @@

The default behavior of file-based InputFormat implementations, typically sub-classes of - + FileInputFormat, is to split the input into logical InputSplit instances based on the total size, in bytes, of the input files. However, the FileSystem blocksize of the @@ -1733,7 +1641,7 @@ record-oriented view of the logical InputSplit to the individual task.

-

+

TextInputFormat is the default InputFormat.

If TextInputFormat is the InputFormat for a @@ -1746,7 +1654,7 @@

InputSplit -

+

InputSplit represents the data to be processed by an individual Mapper.

@@ -1754,7 +1662,7 @@ the input, and it is the responsibility of RecordReader to process and present a record-oriented view.

-

+

FileSplit is the default InputSplit. It sets mapreduce.map.input.file to the path of the input file for the logical split.

@@ -1763,7 +1671,7 @@
RecordReader -

+

RecordReader reads <key, value> pairs from an InputSplit.

@@ -1779,7 +1687,7 @@
Job Output -

+

OutputFormat describes the output-specification for a MapReduce job.

@@ -1803,7 +1711,7 @@
Lazy Output Creation

It is possible to delay creation of output until the first write attempt - by using + by using LazyOutputFormat. This is particularly useful in preventing the creation of zero byte files when there is no call to output.collect (or Context.write). This is achieved by calling the static method @@ -1813,8 +1721,8 @@

- import org.apache.hadoop.mapred.lib.LazyOutputFormat;
- LazyOutputFormat.setOutputFormatClass(conf, TextOutputFormat.class); + import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+ LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

@@ -1822,7 +1730,7 @@
OutputCommitter -

+

OutputCommitter describes the commit of task output for a MapReduce job.

@@ -1863,7 +1771,10 @@ will be launched with same attempt-id to do the cleanup. -

FileOutputCommitter is the default +

FileOutputCommitter + is the default OutputCommitter. Job setup/cleanup tasks occupy map or reduce slots, whichever is free on the TaskTracker. And JobCleanup task, TaskCleanup tasks and JobSetup task have the highest @@ -1887,20 +1798,22 @@

To avoid these issues the MapReduce framework, when the OutputCommitter is FileOutputCommitter, maintains a special - ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} sub-directory + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} + sub-directory accessible via ${mapreduce.task.output.dir} for each task-attempt on the FileSystem where the output of the task-attempt is stored. On successful completion of the task-attempt, the files in the - ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) - are promoted to ${mapreduce.output.fileoutputformat.outputdir}. Of course, + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} + (only) are promoted to + ${mapreduce.output.fileoutputformat.outputdir}. Of course, the framework discards the sub-directory of unsuccessful task-attempts. This process is completely transparent to the application.

The application-writer can take advantage of this feature by creating any side-files required in ${mapreduce.task.output.dir} during execution of a task via - + FileOutputFormat.getWorkOutputPath(), and the framework will promote them similarly for succesful task-attempts, thus eliminating the need to pick unique paths per task-attempt.

@@ -1910,7 +1823,7 @@ ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}, and this value is set by the MapReduce framework. So, just create any side-files in the path returned by - + FileOutputFormat.getWorkOutputPath() from MapReduce task to take advantage of this feature.

@@ -1922,7 +1835,7 @@
RecordWriter -

+

RecordWriter writes the output <key, value> pairs to an output file.

@@ -1950,30 +1863,32 @@ support multiple queues.

A job defines the queue it needs to be submitted to through the - mapreduce.job.queuename property, or through the - setQueueName(String) - API. Setting the queue name is optional. If a job is submitted + mapreduce.job.queuename property. + Setting the queue name is optional. If a job is submitted without an associated queue name, it is submitted to the 'default' queue.

Counters -

Counters represent global counters, defined either by - the MapReduce framework or applications. Each Counter can +

Counters represent global counters, defined either by + the MapReduce framework or applications. Each Counter can be of any Enum type. Counters of a particular Enum are bunched into groups of type Counters.Group.

Applications can define arbitrary Counters (of type - Enum) and update them via - - Reporter.incrCounter(Enum, long) or - - Reporter.incrCounter(String, String, long) - in the map and/or - reduce methods. These counters are then globally - aggregated by the framework.

+ Enum); get a Counter object from the task's + Context with the getCounter() method, and then call + the Counter.increment(long) method to increment its + value locally. These counters are then globally aggregated by the framework.

@@ -1988,7 +1903,7 @@ needed by applications.

Applications specify the files to be cached via urls (hdfs://) - in the JobConf. The DistributedCache + in the Job. The DistributedCache assumes that the files specified via hdfs:// urls are already present on the FileSystem.

@@ -2082,21 +1997,21 @@ or {@link org.apache.hadoop.mapreduce.Reducer}:
public static class MapClass extends Mapper<K, V, K, V> {
- private Path[] localArchives;
- private Path[] localFiles;
- public void setup(Context context) {
- // Get the cached archives/files
- localArchives = context.getLocalCacheArchives();
- localFiles = context.getLocalCacheFiles();
- }
+   private Path[] localArchives;
+   private Path[] localFiles;
+   public void setup(Context context) {
+     // Get the cached archives/files
+     localArchives = context.getLocalCacheArchives();
+     localFiles = context.getLocalCacheFiles();
+   }
- public void map(K key, V value, +   public void map(K key, V value, Context context) throws IOException {
- // Use data from the cached archives/files here
- // ...
- // ...
- context.write(k, v);
- }
+     // Use data from the cached archives/files here
+     // ...
+     // ...
+     context.write(k, v);
+   }
}

@@ -2170,8 +2085,8 @@ information for some of the tasks in the job by setting the configuration property mapreduce.task.profile. The value can be set using the api - - JobConf.setProfileEnabled(boolean). If the value is set + + Job.setProfileEnabled(boolean). If the value is set true, the task profiling is enabled. The profiler information is stored in the user log directory. By default, profiling is not enabled for the job.

@@ -2180,16 +2095,16 @@ the configuration property mapreduce.task.profile.{maps|reduces} to set the ranges of MapReduce tasks to profile. The value can be set using the api - - JobConf.setProfileTaskRange(boolean,String). + + Job.setProfileTaskRange(boolean,String). By default, the specified range is 0-2.

User can also specify the profiler configuration arguments by setting the configuration property mapreduce.task.profile.params. The value can be specified using the api - - JobConf.setProfileParams(String). If the string contains a + + Job.setProfileParams(String). If the string contains a %s, it will be replaced with the name of the profiling output file when the task runs. These parameters are passed to the task child JVM on the command line. The default value for @@ -2224,10 +2139,9 @@ properties mapreduce.map.debug.script and mapreduce.reduce.debug.script, for debugging map and reduce tasks respectively. These properties can also be set by using APIs - - JobConf.setMapDebugScript(String) and - - JobConf.setReduceDebugScript(String) . In streaming mode, a debug + Job.getConfiguration().set(Job.MAP_DEBUG_SCRIPT, String) + and Job.getConfiguration().set(Job.REDUCE_DEBUG_SCRIPT, + String). In streaming mode, a debug script can be submitted with the command-line options -mapdebug and -reducedebug, for debugging map and reduce tasks respectively.

@@ -2280,32 +2194,30 @@ Intermediate Outputs

Applications can control compression of intermediate map-outputs - via the - - JobConf.setCompressMapOutput(boolean) api and the - CompressionCodec to be used via the - - JobConf.setMapOutputCompressorClass(Class) api.

+ via the Job.getConfiguration().setBoolean(Job.MAP_OUTPUT_COMPRESS, bool) + api and the CompressionCodec to be used via the + Job.getConfiguration().setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, Class, + CompressionCodec.class) api.

Job Outputs

Applications can control compression of job-outputs via the - - FileOutputFormat.setCompressOutput(JobConf, boolean) api and the + + FileOutputFormat.setCompressOutput(Job, boolean) api and the CompressionCodec to be used can be specified via the - - FileOutputFormat.setOutputCompressorClass(JobConf, Class) api.

+ + FileOutputFormat.setOutputCompressorClass(Job, Class) api.

If the job outputs are to be stored in the - + SequenceFileOutputFormat, the required SequenceFile.CompressionType (i.e. RECORD / BLOCK - defaults to RECORD) can be specified via the - - SequenceFileOutputFormat.setOutputCompressionType(JobConf, + + SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) api.

@@ -2370,16 +2282,16 @@ bad records. A task will be re-executed till the acceptable skipped value is met or all task attempts are exhausted. To increase the number of task attempts, use - - JobConf.setMaxMapAttempts(int) and - - JobConf.setMaxReduceAttempts(int). + + Job.setMaxMapAttempts(int) and + + Job.setMaxReduceAttempts(int).

Skipped records are written to HDFS in the sequence file format, for later analysis. The location can be changed through [... 1242 lines stripped ...]