Return-Path: DistributedCache
is a facility provided by the Map-Reduce
+ framework to cache files (text, archives, jars etc.) needed by applications.
+
Applications specify the files, via urls (hdfs:// or http://) to be cached
+ via the {@link org.apache.hadoop.mapred.JobConf}. The
+ DistributedCache
assumes that the files specified via urls are
+ already present on the {@link FileSystem} at the path specified by the url
+ and are accessible by every machine in the cluster.
The framework will copy the necessary files on to the slave node before + any tasks for the job are executed on that node. Its efficiency stems from + the fact that the files are only copied once per job and the ability to + cache archives which are un-archived on the slaves.
+ +DistributedCache
can be used to distribute simple, read-only
+ data/text files and/or more complex types such as archives, jars etc.
+ Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
+ Jars may be optionally added to the classpath of the tasks, a rudimentary
+ software distribution mechanism. Files have execution permissions.
+ In older version of Hadoop Map/Reduce users could optionally ask for symlinks
+ to be created in the working directory of the child task. In the current
+ version symlinks are always created. If the URL does not have a fragment
+ the name of the file or directory will be used. If multiple files or
+ directories map to the same link name, the last one added, will be used. All
+ others will not even be downloaded.
DistributedCache
tracks modification timestamps of the cache
+ files. Clearly the cache files should not be modified by the application
+ or externally while the job is executing.
Here is an illustrative example on how to use the
+ DistributedCache
:
+ + It is also very common to use the DistributedCache by using + {@link org.apache.hadoop.util.GenericOptionsParser}. + + This class includes methods that should be used by users + (specifically those mentioned in the example above, as well + as {@link DistributedCache#addArchiveToClassPath(Path, Configuration)}), + as well as methods intended for use by the MapReduce framework + (e.g., {@link org.apache.hadoop.mapred.JobClient}). + + @see org.apache.hadoop.mapred.JobConf + @see org.apache.hadoop.mapred.JobClient + @see org.apache.hadoop.mapreduce.Job]]> + + + + ++ // Setting up the cache for the application + + 1. Copy the requisite files to theFileSystem
: + + $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat + $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip + $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar + $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar + $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz + $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz + + 2. Setup the application'sJobConf
: + + JobConf job = new JobConf(); + DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), + job); + DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); + DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); + DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); + DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); + DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job); + + 3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper} + or {@link org.apache.hadoop.mapred.Reducer}: + + public static class MapClass extends MapReduceBase + implements Mapper<K, V, K, V> { + + private Path[] localArchives; + private Path[] localFiles; + + public void configure(JobConf job) { + // Get the cached archives/files + File f = new File("./map.zip/some/file/in/zip.txt"); + } + + public void map(K key, V value, + OutputCollector<K, V> output, Reporter reporter) + throws IOException { + // Use data from the cached archives/files here + // ... + // ... + output.collect(k, v); + } + } + +
JobTracker
.]]>
+ ClusterStatus
provides clients with information such as:
+ JobTracker
.
+ Clients can query for the latest ClusterStatus
, via
+ {@link JobClient#getClusterStatus()}.
Counters
represent global counters, defined either by the
+ Map-Reduce framework or applications. Each Counter
can be of
+ any {@link Enum} type.
+
+ Counters
are bunched into {@link Group}s, each comprising of
+ counters from a particular Enum
class.]]>
+
Group
handles localization of the class name and the
+ counter names.
false
to ensure that individual input files are never split-up
+ so that {@link Mapper}s process entire files.
+
+ @param fs the file system that the file is on
+ @param filename the file name to check
+ @return is this file splitable?]]>
+ FileInputFormat
is the base class for all file-based
+ InputFormat
s. This provides a generic implementation of
+ {@link #getSplits(JobConf, int)}.
+ Subclasses of FileInputFormat
can also override the
+ {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
+ not split-up and are processed as a whole by {@link Mapper}s.]]>
+ false
otherwise]]>
+ Note: The following is valid only if the {@link OutputCommitter}
+ is {@link FileOutputCommitter}. If OutputCommitter
is not
+ a FileOutputCommitter
, the task's temporary output
+ directory is same as {@link #getOutputPath(JobConf)} i.e.
+ ${mapreduce.output.fileoutputformat.outputdir}$
Some applications need to create/write-to side-files, which differ from + the actual job-outputs. + +
In such cases there could be issues with 2 instances of the same TIP + (running simultaneously e.g. speculative tasks) trying to open/write-to the + same file (path) on HDFS. Hence the application-writer will have to pick + unique names per task-attempt (e.g. using the attemptid, say + attempt_200709221812_0001_m_000000_0), not just per TIP.
+ +To get around this the Map-Reduce framework helps the application-writer + out by maintaining a special + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} + sub-directory for each task-attempt on HDFS where the output of the + task-attempt goes. On successful completion of the task-attempt the files + in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) + are promoted to ${mapreduce.output.fileoutputformat.outputdir}. Of course, the + framework discards the sub-directory of unsuccessful task-attempts. This + is completely transparent to the application.
+ +The application-writer can take advantage of this by creating any + side-files required in ${mapreduce.task.output.dir} during execution + of his reduce-task i.e. via {@link #getWorkOutputPath(JobConf)}, and the + framework will move them out similarly - thus she doesn't have to pick + unique paths per task-attempt.
+ +Note: the value of ${mapreduce.task.output.dir} during + execution of a particular task-attempt is actually + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}, and this value is + set by the map-reduce framework. So, just create any side-files in the + path returned by {@link #getWorkOutputPath(JobConf)} from map/reduce + task to take advantage of this feature.
+ +The entire discussion holds true for maps of jobs with + reducer=NONE (i.e. 0 reduces) since output of the map, in that case, + goes directly to HDFS.
+ + @return the {@link Path} to the task's temporary output directory + for the map-reduce job.]]> +The given name is postfixed with the task type, 'm' for maps, 'r' for + reduces and the task partition number. For example, give a name 'test' + running on the first map o the job the generated name will be + 'test-m-00000'.
+ + @param conf the configuration for the job. + @param name the name to make unique. + @return a unique name accross all tasks of the job.]]> +This method uses the {@link #getUniqueName} method to make the file name + unique for the task.
+ + @param conf the configuration for the job. + @param name the name for the file. + @return a unique path accross all tasks of the job.]]> +