Author: cutting
Date: Fri Mar 2 12:03:35 2007
New Revision: 513924
URL: http://svn.apache.org/viewvc?view=rev&rev=513924
Log:
HADOOP-1032. Permit one to specify jars that will be cached across multiple jobs. Contributed
by Gautam Kowshik.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=513924&r1=513923&r2=513924
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Mar 2 12:03:35 2007
@@ -182,6 +182,9 @@
55. HADOOP-1041. Optimize mapred counter implementation. Also group
counters by their declaring Enum. (David Bowen via cutting)
+56. HADOOP-1032. Permit one to specify jars that will be cached
+ across multiple jobs. (Gautam Kowshik via cutting)
+
Release 0.11.2 - 2007-02-16
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=513924&r1=513923&r2=513924
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Fri Mar
2 12:03:35 2007
@@ -521,7 +521,80 @@
conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
+ uri.toString());
}
-
+
+ /**
+ * Add an file path to the current set of classpath entries It adds the file
+ * to cache as well.
+ *
+ * @param file Path of the file to be added
+ * @param conf Configuration that contains the classpath setting
+ */
+ public static void addFileToClassPath(Path file, Configuration conf)
+ throws IOException {
+ String classpath = conf.get("mapred.job.classpath.files");
+ conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
+ : classpath + System.getProperty("path.separator") + file.toString());
+ FileSystem fs = FileSystem.get(conf);
+ URI uri = fs.makeQualified(file).toUri();
+
+ addCacheFile(uri, conf);
+ }
+
+ /**
+ * Get the file entries in classpath as an array of Path
+ *
+ * @param conf Configuration that contains the classpath setting
+ */
+ public static Path[] getFileClassPaths(Configuration conf) {
+ String classpath = conf.get("mapred.job.classpath.files");
+ if (classpath == null)
+ return null;
+ ArrayList list = Collections.list(new StringTokenizer(classpath, System
+ .getProperty("path.separator")));
+ Path[] paths = new Path[list.size()];
+ for (int i = 0; i < list.size(); i++) {
+ paths[i] = new Path((String) list.get(i));
+ }
+ return paths;
+ }
+
+ /**
+ * Add an archive path to the current set of classpath entries. It adds the
+ * archive to cache as well.
+ *
+ * @param archive Path of the archive to be added
+ * @param conf Configuration that contains the classpath setting
+ */
+ public static void addArchiveToClassPath(Path archive, Configuration conf)
+ throws IOException {
+ String classpath = conf.get("mapred.job.classpath.archives");
+ conf.set("mapred.job.classpath.archives", classpath == null ? archive
+ .toString() : classpath + System.getProperty("path.separator")
+ + archive.toString());
+ FileSystem fs = FileSystem.get(conf);
+ URI uri = fs.makeQualified(archive).toUri();
+
+ addCacheArchive(uri, conf);
+ }
+
+ /**
+ * Get the archive entries in classpath as an array of Path
+ *
+ * @param conf Configuration that contains the classpath setting
+ */
+ public static Path[] getArchiveClassPaths(Configuration conf) {
+ String classpath = conf.get("mapred.job.classpath.archives");
+ if (classpath == null)
+ return null;
+ ArrayList list = Collections.list(new StringTokenizer(classpath, System
+ .getProperty("path.separator")));
+ Path[] paths = new Path[list.size()];
+ for (int i = 0; i < list.size(); i++) {
+ paths[i] = new Path((String) list.get(i));
+ }
+ return paths;
+ }
+
/**
* This method allows you to create symlinks in the current working directory
* of the task to all the cache files/archives
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=513924&r1=513923&r2=513924
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 2 12:03:35
2007
@@ -171,6 +171,45 @@
classPath.append(jobCacheDir);
}
+
+ // include the user specified classpath
+
+ //archive paths
+ Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
+ if (archiveClasspaths != null && archives != null) {
+ Path[] localArchives = DistributedCache
+ .getLocalCacheArchives(conf);
+ if (localArchives != null){
+ for (int i=0;i<archives.length;i++){
+ for(int j=0;j<archiveClasspaths.length;j++){
+ if(archives[i].getPath().equals(
+ archiveClasspaths[j].toString())){
+ classPath.append(sep);
+ classPath.append(localArchives[i]
+ .toString());
+ }
+ }
+ }
+ }
+ }
+ //file paths
+ Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
+ if(fileClasspaths!=null && files != null) {
+ Path[] localFiles = DistributedCache
+ .getLocalCacheFiles(conf);
+ if (localFiles != null) {
+ for (int i = 0; i < files.length; i++) {
+ for (int j = 0; j < fileClasspaths.length; j++) {
+ if (files[i].getPath().equals(
+ fileClasspaths[j].toString())) {
+ classPath.append(sep);
+ classPath.append(localFiles[i].toString());
+ }
+ }
+ }
+ }
+ }
+
classPath.append(sep);
classPath.append(workDir);
// Build exec child jmv args.
|