hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r921780 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/filecache/ src/test/mapred/org/apache/hadoop/mapred/
Date Thu, 11 Mar 2010 10:27:17 GMT
Author: cdouglas
Date: Thu Mar 11 10:27:17 2010
New Revision: 921780

URL: http://svn.apache.org/viewvc?rev=921780&view=rev
Log:
MAPREDUCE-1403. Save the size and number of distributed cache artifacts in the
configuration. Contributed by Arun Murthy

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=921780&r1=921779&r2=921780&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Mar 11 10:27:17 2010
@@ -206,7 +206,10 @@ Trunk (unreleased changes)
     Singh via cdouglas)
 
     MAPREDUCE-1527. Better warning logged when mapred.queue.names is
-    overshadowed by mapred-queues.xml. (Hong Tang via acmurthy) 
+    overshadowed by mapred-queues.xml. (Hong Tang via acmurthy)
+
+    MAPREDUCE-1403. Save the size and number of distributed cache artifacts in
+    the configuration. (Arun Murthy via cdouglas)
 
   OPTIMIZATIONS
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=921780&r1=921779&r2=921780&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Thu Mar 11 10:27:17
2010
@@ -163,6 +163,10 @@ public class JobClient extends CLI {
       this.job = job;
     }
 
+    public Configuration getConfiguration() {
+      return job.getConfiguration();
+    }
+
     /**
      * An identifier for the job
      */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?rev=921780&r1=921779&r2=921780&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Thu Mar 11 10:27:17
2010
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
 
 
 /** 
@@ -34,6 +35,14 @@ import java.io.IOException;
  */
 @Deprecated
 public interface RunningJob {
+
+  /**
+   * Get the underlying job configuration
+   *
+   * @return the configuration of the job.
+   */
+  public Configuration getConfiguration();
+
   /**
    * Get the job identifier.
    * 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=921780&r1=921779&r2=921780&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Thu Mar 11
10:27:17 2010
@@ -99,7 +99,11 @@ public interface JobContext {
   public static final String CLASSPATH_FILES = "mapreduce.job.classpath.files";
   public static final String CACHE_FILES = "mapreduce.job.cache.files";
   public static final String CACHE_ARCHIVES = "mapreduce.job.cache.archives";
-  public static final String CACHE_LOCALFILES = 
+  public static final String CACHE_FILES_SIZES =
+    "mapreduce.job.cache.files.filesizes";    // internal use only
+  public static final String CACHE_ARCHIVES_SIZES =
+    "mapreduce.job.cache.archives.filesizes"; // ditto
+  public static final String CACHE_LOCALFILES =
     "mapreduce.job.cache.local.files";
   public static final String CACHE_LOCALARCHIVES = 
     "mapreduce.job.cache.local.archives";

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=921780&r1=921779&r2=921780&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
Thu Mar 11 10:27:17 2010
@@ -30,7 +30,6 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -341,21 +340,34 @@ public class TrackerDistributedCacheMana
   }
   
   /**
-   * Returns mtime of a given cache file on hdfs.
+   * Returns {@link FileStatus} of a given cache file on hdfs.
    * 
    * @param conf configuration
    * @param cache cache file 
-   * @return mtime of a given cache file on hdfs
+   * @return {@link FileStatus} of a given cache file on hdfs
    * @throws IOException
    */
-  static long getTimestamp(Configuration conf, URI cache)
+  static FileStatus getFileStatus(Configuration conf, URI cache)
     throws IOException {
     FileSystem fileSystem = FileSystem.get(cache, conf);
     Path filePath = new Path(cache.getPath());
 
-    return fileSystem.getFileStatus(filePath).getModificationTime();
+    return fileSystem.getFileStatus(filePath);
   }
-  
+
+  /**
+   * Returns mtime of a given cache file on hdfs.
+   *
+   * @param conf configuration
+   * @param cache cache file
+   * @return mtime of a given cache file on hdfs
+   * @throws IOException
+   */
+  static long getTimestamp(Configuration conf, URI cache)
+    throws IOException {
+    return getFileStatus(conf, cache).getModificationTime();
+  }
+
   /**
    * Returns a boolean to denote whether a cache file is visible to all(public)
    * or not
@@ -687,26 +699,37 @@ public class TrackerDistributedCacheMana
   public static void determineTimestamps(Configuration job) throws IOException {
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     if (tarchives != null) {
-      StringBuffer archiveTimestamps = 
-        new StringBuffer(String.valueOf(
-            getTimestamp(job, tarchives[0])));
+      FileStatus status = getFileStatus(job, tarchives[0]);
+      StringBuilder archiveFileSizes =
+        new StringBuilder(String.valueOf(status.getLen()));
+      StringBuilder archiveTimestamps =
+        new StringBuilder(String.valueOf(status.getModificationTime()));
       for (int i = 1; i < tarchives.length; i++) {
+        status = getFileStatus(job, tarchives[i]);
+        archiveFileSizes.append(",");
+        archiveFileSizes.append(String.valueOf(status.getLen()));
         archiveTimestamps.append(",");
-        archiveTimestamps.append(String.valueOf(
-            getTimestamp(job, tarchives[i])));
+        archiveTimestamps.append(String.valueOf(status.getModificationTime()));
       }
+      job.set(JobContext.CACHE_ARCHIVES_SIZES, archiveFileSizes.toString());
       setArchiveTimestamps(job, archiveTimestamps.toString());
     }
   
     URI[] tfiles = DistributedCache.getCacheFiles(job);
     if (tfiles != null) {
-      StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
-          getTimestamp(job, tfiles[0])));
+      FileStatus status = getFileStatus(job, tfiles[0]);
+      StringBuilder fileSizes =
+        new StringBuilder(String.valueOf(status.getLen()));
+      StringBuilder fileTimestamps = new StringBuilder(String.valueOf(
+        status.getModificationTime()));
       for (int i = 1; i < tfiles.length; i++) {
+        status = getFileStatus(job, tfiles[i]);
+        fileSizes.append(",");
+        fileSizes.append(String.valueOf(status.getLen()));
         fileTimestamps.append(",");
-        fileTimestamps.append(String.valueOf(
-            getTimestamp(job, tfiles[i])));
+        fileTimestamps.append(String.valueOf(status.getModificationTime()));
       }
+      job.set(JobContext.CACHE_FILES_SIZES, fileSizes.toString());
       setFileTimestamps(job, fileTimestamps.toString());
     }
   }
@@ -751,8 +774,8 @@ public class TrackerDistributedCacheMana
   throws IOException {
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     if (tarchives != null) {
-      StringBuffer archiveVisibilities = 
-        new StringBuffer(String.valueOf(isPublic(job, tarchives[0])));
+      StringBuilder archiveVisibilities =
+        new StringBuilder(String.valueOf(isPublic(job, tarchives[0])));
       for (int i = 1; i < tarchives.length; i++) {
         archiveVisibilities.append(",");
         archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i])));
@@ -761,8 +784,8 @@ public class TrackerDistributedCacheMana
     }
     URI[] tfiles = DistributedCache.getCacheFiles(job);
     if (tfiles != null) {
-      StringBuffer fileVisibilities = 
-        new StringBuffer(String.valueOf(isPublic(job, tfiles[0])));
+      StringBuilder fileVisibilities =
+        new StringBuilder(String.valueOf(isPublic(job, tfiles[0])));
       for (int i = 1; i < tfiles.length; i++) {
         fileVisibilities.append(",");
         fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i])));

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java?rev=921780&r1=921779&r2=921780&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java Thu Mar
11 10:27:17 2010
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
 import java.io.*;
 import java.util.*;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -34,9 +35,12 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.JobContext;
 
 import java.net.URI;
 
+import junit.framework.Assert;
+
 public class MRCaching {
   static String testStr = "This is a test file " + "used for testing caching "
     + "jars, zip and normal files.";
@@ -271,8 +275,16 @@ public class MRCaching {
       uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
     }
     DistributedCache.addCacheFile(uris[0], conf);
+
+    // Save expected file sizes
+    long[] fileSizes = new long[1];
+    fileSizes[0] = fs.getFileStatus(new Path(uris[0].getPath())).getLen();
+
+    long[] archiveSizes = new long[5]; // track last 5
     for (int i = 1; i < 6; i++) {
       DistributedCache.addCacheArchive(uris[i], conf);
+      archiveSizes[i-1] = // starting with second archive
+        fs.getFileStatus(new Path(uris[i].getPath())).getLen();
     }
     RunningJob job = JobClient.runJob(conf);
     int count = 0;
@@ -295,7 +307,32 @@ public class MRCaching {
     if (count != 6)
       return new TestResult(job, false);
 
+    // Check to ensure the filesizes of files in DC were correctly saved.
+    // Note, the underlying job clones the original conf before determine
+    // various stats (timestamps etc.), so we have to getConfiguration here.
+    validateCacheFileSizes(job.getConfiguration(), fileSizes,
+                           JobContext.CACHE_FILES_SIZES);
+    validateCacheFileSizes(job.getConfiguration(), archiveSizes,
+                           JobContext.CACHE_ARCHIVES_SIZES);
+
     return new TestResult(job, true);
 
   }
+
+  private static void validateCacheFileSizes(Configuration job,
+                                             long[] expectedSizes,
+                                             String configKey)
+  throws IOException {
+    String configValues = job.get(configKey, "");
+    System.out.println(configKey + " -> " + configValues);
+    String[] realSizes = StringUtils.getStrings(configValues);
+    Assert.assertEquals("Number of files for "+ configKey,
+                        expectedSizes.length, realSizes.length);
+
+    for (int i=0; i < expectedSizes.length; ++i) {
+      long actual = Long.valueOf(realSizes[i]);
+      long expected = expectedSizes[i];
+      Assert.assertEquals("File "+ i +" for "+ configKey, expected, actual);
+    }
+  }
 }



Mime
View raw message