hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r812079 - in /hadoop/mapreduce/trunk: ./ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/filecache/ src/test/mapred/org/apache/hadoop/mapred/
Date Mon, 07 Sep 2009 11:05:49 GMT
Author: ddas
Date: Mon Sep  7 11:05:49 2009
New Revision: 812079

URL: http://svn.apache.org/viewvc?rev=812079&view=rev
Log:
MAPREDUCE-898. Changes DistributedCache to use the new API. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=812079&r1=812078&r2=812079&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Sep  7 11:05:49 2009
@@ -289,6 +289,9 @@
     MAPREDUCE-370. Update MultipleOutputs to use the API, merge funcitonality
     of MultipleOutputFormat. (Amareshwari Sriramadasu via cdouglas)
 
+    MAPREDUCE-898. Changes DistributedCache to use the new API.
+    (Amareshwari Sriramadasu via ddas)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=812079&r1=812078&r2=812079&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Mon
Sep  7 11:05:49 2009
@@ -1881,6 +1881,52 @@
           cached files that are symlinked into the working directory of the
           task can be used to distribute native libraries and load them.</p>
           
+          <p>The <code>DistributedCache</code> tracks modification timestamps

+          of the cache files/archives. Clearly the cache files/archives should
+          not be modified by the application or externally 
+          while the job is executing.</p>
+          
+          <p>Here is an illustrative example on how to use the 
+          <code>DistributedCache</code>:<br/>
+           // Setting up the cache for the application
+           1. Copy the requisite files to the <code>FileSystem</code>:<br/>
+            <code>$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat</code><br/>
 
+            <code>$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip </code><br/>

+            <code>$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar</code><br/>
+            <code>$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar</code><br/>
+            <code>$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz</code><br/>
+            <code>$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz</code><br/>
+           2. Setup the job<br/>
+            <code>Job job = new Job(conf);</code><br/>
+            <code>job.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"));</code><br/>
+            <code>job.addCacheArchive(new URI("/myapp/map.zip"));</code><br/>
+            <code>job.addFileToClassPath(new Path("/myapp/mylib.jar"));</code><br/>
+            <code>job.addCacheArchive(new URI("/myapp/mytar.tar"));</code><br/>
+            <code>job.addCacheArchive(new URI("/myapp/mytgz.tgz"));</code><br/>
+            <code>job.addCacheArchive(new URI("/myapp/mytargz.tar.gz"));</code><br/>
+      
+           3. Use the cached files in the 
+              <code>{@link org.apache.hadoop.mapreduce.Mapper}
+              or {@link org.apache.hadoop.mapreduce.Reducer}:</code><br/>
+      
+              <code>public static class MapClass extends Mapper&lt;K, V, K, V&gt;
{</code><br/>
+                <code>private Path[] localArchives;</code><br/>
+                <code>private Path[] localFiles;</code><br/>
+                <code>public void setup(Context context) {</code><br/>
+                 <code>// Get the cached archives/files</code><br/>
+                 <code>localArchives = context.getLocalCacheArchives();</code><br/>
+                 <code>localFiles = context.getLocalCacheFiles();</code><br/>
+              <code>}</code><br/>
+        
+              <code>public void map(K key, V value, 
+                  Context context) throws IOException {</code><br/>
+                <code>// Use data from the cached archives/files here</code><br/>
+                <code>// ...</code><br/>
+                <code>// ...</code><br/>
+                <code>context.write(k, v);</code><br/>
+              <code>}</code><br/>
+            <code>}</code></p>
+          
         </section>
         
         <section>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java?rev=812079&r1=812078&r2=812079&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java Mon Sep  7 11:05:49
2009
@@ -19,11 +19,13 @@
 package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
+import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
@@ -302,6 +304,97 @@
   }
 
   /**
+   * Set the given set of archives
+   * @param archives The list of archives that need to be localized
+   */
+  public void setCacheArchives(URI[] archives) {
+    ensureState(JobState.DEFINE);
+    DistributedCache.setCacheArchives(archives, conf);
+  }
+
+  /**
+   * Set the given set of files
+   * @param files The list of files that need to be localized
+   */
+  public void setCacheFiles(URI[] files) {
+    ensureState(JobState.DEFINE);
+    DistributedCache.setCacheFiles(files, conf);
+  }
+
+  /**
+   * Add a archives to be localized
+   * @param uri The uri of the cache to be localized
+   */
+  public void addCacheArchive(URI uri) {
+    ensureState(JobState.DEFINE);
+    DistributedCache.addCacheArchive(uri, conf);
+  }
+  
+  /**
+   * Add a file to be localized
+   * @param uri The uri of the cache to be localized
+   */
+  public void addCacheFile(URI uri) {
+    ensureState(JobState.DEFINE);
+    DistributedCache.addCacheFile(uri, conf);
+  }
+
+  /**
+   * Add an file path to the current set of classpath entries It adds the file
+   * to cache as well.
+   * 
+   * @param file Path of the file to be added
+   */
+  public void addFileToClassPath(Path file)
+    throws IOException {
+    ensureState(JobState.DEFINE);
+    DistributedCache.addFileToClassPath(file, conf);
+  }
+
+  /**
+   * Add an archive path to the current set of classpath entries. It adds the
+   * archive to cache as well.
+   * 
+   * @param archive Path of the archive to be added
+   */
+  public void addArchiveToClassPath(Path archive)
+    throws IOException {
+    ensureState(JobState.DEFINE);
+    DistributedCache.addArchiveToClassPath(archive, conf);
+  }
+
+  /**
+   * This method allows you to create symlinks in the current working directory
+   * of the task to all the cache files/archives
+   */
+  public void createSymlink() {
+    ensureState(JobState.DEFINE);
+    DistributedCache.createSymlink(conf);
+  }
+  
+  /** 
+   * Expert: Set the number of maximum attempts that will be made to run a
+   * map task.
+   * 
+   * @param n the number of attempts per map task.
+   */
+  public void setMaxMapAttempts(int n) {
+    ensureState(JobState.DEFINE);
+    conf.setMaxMapAttempts(n);
+  }
+
+  /** 
+   * Expert: Set the number of maximum attempts that will be made to run a
+   * reduce task.
+   * 
+   * @param n the number of attempts per reduce task.
+   */
+  public void setMaxReduceAttempts(int n) {
+    ensureState(JobState.DEFINE);
+    conf.setMaxReduceAttempts(n);
+  }
+
+  /**
    * Get the URL where some job progress information will be displayed.
    * 
    * @return the URL where some job progress information will be displayed.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=812079&r1=812078&r2=812079&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Mon Sep  7
11:05:49 2009
@@ -19,11 +19,13 @@
 package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
+import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
@@ -243,5 +245,108 @@
     return conf.getBoolean("mapred.committer.job.setup.cleanup.needed", true);
   }
 
+  /**
+   * This method checks to see if symlinks are to be create for the 
+   * localized cache files in the current working directory 
+   * @return true if symlinks are to be created- else return false
+   */
+  public boolean getSymlink() {
+    return DistributedCache.getSymlink(conf);
+  }
+  
+  /**
+   * Get the archive entries in classpath as an array of Path
+   */
+  public Path[] getArchiveClassPaths() {
+    return DistributedCache.getArchiveClassPaths(conf);
+  }
+
+  /**
+   * Get cache archives set in the Configuration
+   * @return A URI array of the caches set in the Configuration
+   * @throws IOException
+   */
+  public URI[] getCacheArchives() throws IOException {
+    return DistributedCache.getCacheArchives(conf);
+  }
+
+  /**
+   * Get cache files set in the Configuration
+   * @return A URI array of the files set in the Configuration
+   * @throws IOException
+   */
+
+  public URI[] getCacheFiles() throws IOException {
+    return DistributedCache.getCacheFiles(conf);
+  }
+
+  /**
+   * Return the path array of the localized caches
+   * @return A path array of localized caches
+   * @throws IOException
+   */
+  public Path[] getLocalCacheArchives()
+    throws IOException {
+    return DistributedCache.getLocalCacheArchives(conf);
+  }
+
+  /**
+   * Return the path array of the localized files
+   * @return A path array of localized files
+   * @throws IOException
+   */
+  public Path[] getLocalCacheFiles()
+    throws IOException {
+    return DistributedCache.getLocalCacheFiles(conf);
+  }
+
+  /**
+   * Get the file entries in classpath as an array of Path
+   */
+  public Path[] getFileClassPaths() {
+    return DistributedCache.getFileClassPaths(conf);
+  }
+  
+  /**
+   * Get the timestamps of the archives.  Used by internal
+   * DistributedCache and MapReduce code.
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public String[] getArchiveTimestamps() {
+    return DistributedCache.getArchiveTimestamps(conf);
+  }
+
+  /**
+   * Get the timestamps of the files.  Used by internal
+   * DistributedCache and MapReduce code.
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public String[] getFileTimestamps() {
+    return DistributedCache.getFileTimestamps(conf);
+  }
+
+  /** 
+   * Get the configured number of maximum attempts that will be made to run a
+   * map task, as specified by the <code>mapred.map.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   *  
+   * @return the max number of attempts per map task.
+   */
+  public int getMaxMapAttempts() {
+    return conf.getMaxMapAttempts();
+  }
+
+  /** 
+   * Get the configured number of maximum attempts  that will be made to run a
+   * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   * 
+   * @return the max number of attempts per reduce task.
+   */
+  public int getMaxReduceAttempts() {
+    return conf.getMaxReduceAttempts();
+  }
 
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=812079&r1=812078&r2=812079&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
Mon Sep  7 11:05:49 2009
@@ -24,6 +24,8 @@
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 
 import java.net.URI;
 
@@ -125,6 +127,7 @@
  * @see org.apache.hadoop.mapred.JobConf
  * @see org.apache.hadoop.mapred.JobClient
  */
+@Deprecated
 public class DistributedCache {
   /**
    * Get the locally cached file or archive; it could either be 
@@ -148,9 +151,10 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
+   * @deprecated Internal to MapReduce framework. 
+   * Use TrackerDistributedCacheManager instead.
    */
+  @Deprecated
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, FileStatus fileStatus,
                                    boolean isArchive, long confFileStamp,
@@ -185,9 +189,10 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
+   * @deprecated Internal to MapReduce framework. 
+   * Use TrackerDistributedCacheManager instead.
    */
+  @Deprecated
   public static Path getLocalCache(URI cache, Configuration conf, 
       Path baseDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
@@ -219,9 +224,10 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
+   * @deprecated Internal to MapReduce framework.  
+   * Use TrackerDistributedCacheManager instead.
    */
+  @Deprecated
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, boolean isArchive,
                                    long confFileStamp, Path currentWorkDir) 
@@ -238,15 +244,16 @@
    * @param conf configuration which contains the filesystem the cache 
    * is contained in.
    * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
+   * @deprecated Internal to MapReduce framework. 
+   * Use TrackerDistributedCacheManager instead.
    */
+  @Deprecated
   public static void releaseCache(URI cache, Configuration conf)
       throws IOException {
     new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
   }
   
-  /*
+  /**
    * Returns the relative path of the dir this cache will be localized in
    * relative path that this cache will be localized in. For
    * hdfs://hostname:port/absolute_path -- the relative path is
@@ -256,6 +263,7 @@
    * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
    * instead.
    */
+  @Deprecated
   public static String makeRelative(URI cache, Configuration conf)
       throws IOException {
     return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
@@ -268,13 +276,13 @@
    * @param cache cache file 
    * @return mtime of a given cache file on hdfs
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  
+   * Use {@link TrackerDistributedCacheManager} instead.
    */
+  @Deprecated
   public static long getTimestamp(Configuration conf, URI cache)
     throws IOException {
-    FileSystem fileSystem = FileSystem.get(cache, conf);
-    Path filePath = new Path(cache.getPath());
-
-    return fileSystem.getFileStatus(filePath).getModificationTime();
+    return TrackerDistributedCacheManager.getTimestamp(conf, cache);
   }
 
   /**
@@ -286,6 +294,7 @@
    * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
    * instead.
    */
+  @Deprecated
   public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
       throws IOException{
     TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir, workDir);
@@ -296,7 +305,9 @@
    * to be used by user code.
    * @param archives The list of archives that need to be localized
    * @param conf Configuration which will be changed
+   * @deprecated Use {@link Job#setCacheArchives(URI[])} instead
    */
+  @Deprecated
   public static void setCacheArchives(URI[] archives, Configuration conf) {
     String sarchives = StringUtils.uriToString(archives);
     conf.set("mapred.cache.archives", sarchives);
@@ -307,7 +318,9 @@
    * used by user code.
    * @param files The list of files that need to be localized
    * @param conf Configuration which will be changed
+   * @deprecated Use {@link Job#setCacheFiles(URI[])} instead
    */
+  @Deprecated
   public static void setCacheFiles(URI[] files, Configuration conf) {
     String sfiles = StringUtils.uriToString(files);
     conf.set("mapred.cache.files", sfiles);
@@ -319,7 +332,9 @@
    * @param conf The configuration which contains the archives
    * @return A URI array of the caches set in the Configuration
    * @throws IOException
+   * @deprecated Use {@link JobContext#getCacheArchives()} instead
    */
+  @Deprecated
   public static URI[] getCacheArchives(Configuration conf) throws IOException {
     return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives"));
   }
@@ -330,7 +345,9 @@
    * @param conf The configuration which contains the files
    * @return A URI array of the files set in the Configuration
    * @throws IOException
+   * @deprecated Use {@link JobContext#getCacheFiles()} instead
    */
+  @Deprecated
   public static URI[] getCacheFiles(Configuration conf) throws IOException {
     return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
   }
@@ -341,7 +358,9 @@
    * @param conf Configuration that contains the localized archives
    * @return A path array of localized caches
    * @throws IOException
+   * @deprecated Use {@link JobContext#getLocalCacheArchives()} instead
    */
+  @Deprecated
   public static Path[] getLocalCacheArchives(Configuration conf)
     throws IOException {
     return StringUtils.stringToPath(conf
@@ -354,7 +373,9 @@
    * @param conf Configuration that contains the localized files
    * @return A path array of localized files
    * @throws IOException
+   * @deprecated Use {@link JobContext#getLocalCacheFiles()} instead
    */
+  @Deprecated
   public static Path[] getLocalCacheFiles(Configuration conf)
     throws IOException {
     return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles"));
@@ -366,7 +387,9 @@
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
+   * @deprecated Use {@link JobContext#getArchiveTimestamps()} instead
    */
+  @Deprecated
   public static String[] getArchiveTimestamps(Configuration conf) {
     return conf.getStrings("mapred.cache.archives.timestamps");
   }
@@ -378,7 +401,9 @@
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
+   * @deprecated Use {@link JobContext#getFileTimestamps()} instead
    */
+  @Deprecated
   public static String[] getFileTimestamps(Configuration conf) {
     return conf.getStrings("mapred.cache.files.timestamps");
   }
@@ -389,9 +414,13 @@
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of archives.
    * The order should be the same as the order in which the archives are added.
+   * @deprecated Use 
+   * {@link TrackerDistributedCacheManager#setArchiveTimestamps(Configuration, String)}
+   * instead
    */
+  @Deprecated
   public static void setArchiveTimestamps(Configuration conf, String timestamps) {
-    conf.set("mapred.cache.archives.timestamps", timestamps);
+    TrackerDistributedCacheManager.setArchiveTimestamps(conf, timestamps);
   }
 
   /**
@@ -400,9 +429,13 @@
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of files.
    * The order should be the same as the order in which the files are added.
+   * @deprecated Use 
+   * {@link TrackerDistributedCacheManager#setFileTimestamps(Configuration, String)}
+   * instead
    */
+  @Deprecated
   public static void setFileTimestamps(Configuration conf, String timestamps) {
-    conf.set("mapred.cache.files.timestamps", timestamps);
+    TrackerDistributedCacheManager.setFileTimestamps(conf, timestamps);
   }
   
   /**
@@ -410,9 +443,13 @@
    * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local archives
+   * @deprecated Use 
+   * {@link TrackerDistributedCacheManager#setLocalArchives(Configuration, String)}
+   * instead
    */
+  @Deprecated
   public static void setLocalArchives(Configuration conf, String str) {
-    conf.set("mapred.cache.localArchives", str);
+    TrackerDistributedCacheManager.setLocalArchives(conf, str);
   }
 
   /**
@@ -420,9 +457,13 @@
    * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local files
+   * @deprecated Use 
+   * {@link TrackerDistributedCacheManager#setLocalFiles(Configuration, String)}
+   * instead
    */
+  @Deprecated
   public static void setLocalFiles(Configuration conf, String str) {
-    conf.set("mapred.cache.localFiles", str);
+    TrackerDistributedCacheManager.setLocalFiles(conf, str);
   }
 
   /**
@@ -430,7 +471,9 @@
    * be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
+   * @deprecated Use {@link Job#addCacheArchive(URI)} instead
    */
+  @Deprecated
   public static void addCacheArchive(URI uri, Configuration conf) {
     String archives = conf.get("mapred.cache.archives");
     conf.set("mapred.cache.archives", archives == null ? uri.toString()
@@ -442,7 +485,9 @@
    * to be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
+   * @deprecated Use {@link Job#addCacheFile(URI)} instead
    */
+  @Deprecated
   public static void addCacheFile(URI uri, Configuration conf) {
     String files = conf.get("mapred.cache.files");
     conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
@@ -455,7 +500,9 @@
    * 
    * @param file Path of the file to be added
    * @param conf Configuration that contains the classpath setting
+   * @deprecated Use {@link Job#addFileToClassPath(Path)} instead
    */
+  @Deprecated
   public static void addFileToClassPath(Path file, Configuration conf)
     throws IOException {
     String classpath = conf.get("mapred.job.classpath.files");
@@ -472,7 +519,9 @@
    * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
+   * @deprecated Use {@link JobContext#getFileClassPaths()} instead 
    */
+  @Deprecated
   public static Path[] getFileClassPaths(Configuration conf) {
     ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
                                 "mapred.job.classpath.files");
@@ -492,7 +541,9 @@
    * 
    * @param archive Path of the archive to be added
    * @param conf Configuration that contains the classpath setting
+   * @deprecated Use {@link Job#addArchiveToClassPath(Path)} instead
    */
+  @Deprecated
   public static void addArchiveToClassPath(Path archive, Configuration conf)
     throws IOException {
     String classpath = conf.get("mapred.job.classpath.archives");
@@ -509,7 +560,9 @@
    * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
+   * @deprecated Use {@link JobContext#getArchiveClassPaths()} instead 
    */
+  @Deprecated
   public static Path[] getArchiveClassPaths(Configuration conf) {
     ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
                                 "mapred.job.classpath.archives");
@@ -527,8 +580,10 @@
    * This method allows you to create symlinks in the current working directory
    * of the task to all the cache files/archives.
    * Intended to be used by user code.
-   * @param conf the jobconf 
+   * @param conf the jobconf
+   * @deprecated Use {@link Job#createSymlink()} instead  
    */
+  @Deprecated
   public static void createSymlink(Configuration conf){
     conf.set("mapred.create.symlink", "yes");
   }
@@ -539,7 +594,9 @@
    * Used by internal DistributedCache code.
    * @param conf the jobconf
    * @return true if symlinks are to be created- else return false
+   * @deprecated Use {@link JobContext#getSymlink()} instead
    */
+  @Deprecated
   public static boolean getSymlink(Configuration conf){
     String result = conf.get("mapred.create.symlink");
     if ("yes".equals(result)){
@@ -555,52 +612,22 @@
    * the various archives and files.  May be used by user code.
    * @param uriFiles The uri array of urifiles
    * @param uriArchives the uri array of uri archives
+   * @deprecated Use 
+   * {@link TrackerDistributedCacheManager#checkURIs(URI[], URI[])} instead
    */
+  @Deprecated
   public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
-    if ((uriFiles == null) && (uriArchives == null)){
-      return true;
-    }
-    if (uriFiles != null){
-      for (int i = 0; i < uriFiles.length; i++){
-        String frag1 = uriFiles[i].getFragment();
-        if (frag1 == null)
-          return false;
-        for (int j=i+1; j < uriFiles.length; j++){
-          String frag2 = uriFiles[j].getFragment();
-          if (frag2 == null)
-            return false;
-          if (frag1.equalsIgnoreCase(frag2))
-            return false;
-        }
-        if (uriArchives != null){
-          for (int j = 0; j < uriArchives.length; j++){
-            String frag2 = uriArchives[j].getFragment();
-            if (frag2 == null){
-              return false;
-            }
-            if (frag1.equalsIgnoreCase(frag2))
-              return false;
-            for (int k=j+1; k < uriArchives.length; k++){
-              String frag3 = uriArchives[k].getFragment();
-              if (frag3 == null)
-                return false;
-              if (frag2.equalsIgnoreCase(frag3))
-                return false;
-            }
-          }
-        }
-      }
-    }
-    return true;
+    return TrackerDistributedCacheManager.checkURIs(uriFiles, uriArchives);
   }
 
   /**
    * Clear the entire contents of the cache and delete the backing files. This
    * should only be used when the server is reinitializing, because the users
    * are going to lose their files.
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
+   * @deprecated Internal to MapReduce framework. 
+   * Use TrackerDistributedCacheManager instead.
    */
+  @Deprecated
   public static void purgeCache(Configuration conf) throws IOException {
     new TrackerDistributedCacheManager(conf).purgeCache();
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=812079&r1=812078&r2=812079&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
Mon Sep  7 11:05:49 2009
@@ -174,11 +174,12 @@
 
     // Update the configuration object with localized data.
     if (!localArchives.isEmpty()) {
-      DistributedCache.setLocalArchives(taskConf, 
+      TrackerDistributedCacheManager.setLocalArchives(taskConf, 
         stringifyPathList(localArchives));
     }
     if (!localFiles.isEmpty()) {
-      DistributedCache.setLocalFiles(taskConf, stringifyPathList(localFiles));
+      TrackerDistributedCacheManager.setLocalFiles(taskConf,
+        stringifyPathList(localFiles));
     }
 
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=812079&r1=812078&r2=812079&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
Mon Sep  7 11:05:49 2009
@@ -27,13 +27,11 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.RunJar;
 
 /**
@@ -208,6 +206,22 @@
     return path;
   }
 
+  /**
+   * Returns mtime of a given cache file on hdfs.
+   * 
+   * @param conf configuration
+   * @param cache cache file 
+   * @return mtime of a given cache file on hdfs
+   * @throws IOException
+   */
+  static long getTimestamp(Configuration conf, URI cache)
+    throws IOException {
+    FileSystem fileSystem = FileSystem.get(cache, conf);
+    Path filePath = new Path(cache.getPath());
+
+    return fileSystem.getFileStatus(filePath).getModificationTime();
+  }
+
   private Path cacheFilePath(Path p) {
     return new Path(p, p.getName());
   }
@@ -324,7 +338,7 @@
 
       // update cacheStatus to reflect the newly cached file
       cacheStatus.currentStatus = true;
-      cacheStatus.mtime = DistributedCache.getTimestamp(conf, cache);
+      cacheStatus.mtime = getTimestamp(conf, cache);
 
       LOG.info(String.format("Cached %s as %s",
           cache.toString(), cacheStatus.localLoadPath));
@@ -367,7 +381,7 @@
       if (fileStatus != null) {
         dfsFileStamp = fileStatus.getModificationTime();
       } else {
-        dfsFileStamp = DistributedCache.getTimestamp(conf, cache);
+        dfsFileStamp = getTimestamp(conf, cache);
       }
 
       // ensure that the file on hdfs hasn't been modified since the job started
@@ -488,25 +502,113 @@
     if (tarchives != null) {
       StringBuffer archiveTimestamps = 
         new StringBuffer(String.valueOf(
-            DistributedCache.getTimestamp(job, tarchives[0])));
+            getTimestamp(job, tarchives[0])));
       for (int i = 1; i < tarchives.length; i++) {
         archiveTimestamps.append(",");
         archiveTimestamps.append(String.valueOf(
-            DistributedCache.getTimestamp(job, tarchives[i])));
+            getTimestamp(job, tarchives[i])));
       }
-      DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
+      setArchiveTimestamps(job, archiveTimestamps.toString());
     }
   
     URI[] tfiles = DistributedCache.getCacheFiles(job);
     if (tfiles != null) {
       StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
-          DistributedCache.getTimestamp(job, tfiles[0])));
+          getTimestamp(job, tfiles[0])));
       for (int i = 1; i < tfiles.length; i++) {
         fileTimestamps.append(",");
         fileTimestamps.append(String.valueOf(
-            DistributedCache.getTimestamp(job, tfiles[i])));
+            getTimestamp(job, tfiles[i])));
+      }
+      setFileTimestamps(job, fileTimestamps.toString());
+    }
+  }
+  
+  /**
+   * This method checks if there is a conflict in the fragment names 
+   * of the uris. Also makes sure that each uri has a fragment. It 
+   * is only to be called if you want to create symlinks for 
+   * the various archives and files.  May be used by user code.
+   * @param uriFiles The uri array of urifiles
+   * @param uriArchives the uri array of uri archives
+   */
+  public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
+    if ((uriFiles == null) && (uriArchives == null)){
+      return true;
+    }
+    if (uriFiles != null){
+      for (int i = 0; i < uriFiles.length; i++){
+        String frag1 = uriFiles[i].getFragment();
+        if (frag1 == null)
+          return false;
+        for (int j=i+1; j < uriFiles.length; j++){
+          String frag2 = uriFiles[j].getFragment();
+          if (frag2 == null)
+            return false;
+          if (frag1.equalsIgnoreCase(frag2))
+            return false;
+        }
+        if (uriArchives != null){
+          for (int j = 0; j < uriArchives.length; j++){
+            String frag2 = uriArchives[j].getFragment();
+            if (frag2 == null){
+              return false;
+            }
+            if (frag1.equalsIgnoreCase(frag2))
+              return false;
+            for (int k=j+1; k < uriArchives.length; k++){
+              String frag3 = uriArchives[k].getFragment();
+              if (frag3 == null)
+                return false;
+              if (frag2.equalsIgnoreCase(frag3))
+                return false;
+            }
+          }
+        }
       }
-      DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
     }
+    return true;
+  }
+
+  /**
+   * This is to check the timestamp of the archives to be localized.
+   * 
+   * @param conf Configuration which stores the timestamp's
+   * @param timestamps comma separated list of timestamps of archives.
+   * The order should be the same as the order in which the archives are added.
+   */
+  static void setArchiveTimestamps(Configuration conf, String timestamps) {
+    conf.set("mapred.cache.archives.timestamps", timestamps);
+  }
+
+  /**
+   * This is to check the timestamp of the files to be localized.
+   * 
+   * @param conf Configuration which stores the timestamp's
+   * @param timestamps comma separated list of timestamps of files.
+   * The order should be the same as the order in which the files are added.
+   */
+  static void setFileTimestamps(Configuration conf, String timestamps) {
+    conf.set("mapred.cache.files.timestamps", timestamps);
+  }
+  
+  /**
+   * Set the conf to contain the location for localized archives.
+   * 
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local archives
+   */
+  static void setLocalArchives(Configuration conf, String str) {
+    conf.set("mapred.cache.localArchives", str);
+  }
+
+  /**
+   * Set the conf to contain the location for localized files.
+   * 
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local files
+   */
+  static void setLocalFiles(Configuration conf, String str) {
+    conf.set("mapred.cache.localFiles", str);
   }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=812079&r1=812078&r2=812079&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
Mon Sep  7 11:05:49 2009
@@ -31,7 +31,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -80,8 +79,8 @@
     @Override
     public void setup(Context context) throws IOException {
       Configuration conf = context.getConfiguration();
-      Path[] files = DistributedCache.getLocalCacheFiles(conf);
-      Path[] archives = DistributedCache.getLocalCacheArchives(conf);
+      Path[] files = context.getLocalCacheFiles();
+      Path[] archives = context.getLocalCacheArchives();
       FileSystem fs = LocalFileSystem.get(conf);
 
       // Check that 2 files and 2 archives are present
@@ -121,7 +120,7 @@
     }
   }
 
-  private void testWithConf(JobConf conf) throws IOException,
+  private void testWithConf(Configuration conf) throws IOException,
       InterruptedException, ClassNotFoundException, URISyntaxException {
     // Create a temporary file of length 1.
     Path first = createTempFile("distributed.first", "x");
@@ -133,20 +132,19 @@
     Path fourth =
         makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
 
-    // Creates the Job Configuration
-    DistributedCache.addCacheFile(
-        new URI(first.toUri().toString() + "#distributed.first.symlink"),
-        conf);
-    DistributedCache.addFileToClassPath(second, conf);
-    DistributedCache.addArchiveToClassPath(third, conf);
-    DistributedCache.addCacheArchive(fourth.toUri(), conf);
-    DistributedCache.createSymlink(conf);
 
-    conf.setMaxMapAttempts(1); // speed up failures
     Job job = new Job(conf);
     job.setMapperClass(DistributedCacheChecker.class);
     job.setOutputFormatClass(NullOutputFormat.class);
     FileInputFormat.setInputPaths(job, first);
+    // Creates the Job Configuration
+    job.addCacheFile(
+      new URI(first.toUri().toString() + "#distributed.first.symlink"));
+    job.addFileToClassPath(second);
+    job.addArchiveToClassPath(third);
+    job.addCacheArchive(fourth.toUri());
+    job.createSymlink();
+    job.setMaxMapAttempts(1); // speed up failures
 
     job.submit();
     assertTrue(job.waitForCompletion(false));
@@ -154,7 +152,7 @@
 
   /** Tests using the local job runner. */
   public void testLocalJobRunner() throws Exception {
-    JobConf c = new JobConf();
+    Configuration c = new Configuration();
     c.set("mapred.job.tracker", "local");
     c.set("fs.default.name", "file:///");
     testWithConf(c);



Mime
View raw message