hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r557790 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/filecache/DistributedCache.java src/java/org/apache/hadoop/mapred/JobClient.java src/java/org/apache/hadoop/mapred/TaskRunner.java
Date Thu, 19 Jul 2007 22:01:15 GMT
Author: cutting
Date: Thu Jul 19 15:01:14 2007
New Revision: 557790

URL: http://svn.apache.org/viewvc?view=rev&rev=557790
Log:
HADOOP-1084.  Switch mapred file cache to use file modification time instead of checksum to
detect changes.  Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=557790&r1=557789&r2=557790
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jul 19 15:01:14 2007
@@ -384,6 +384,10 @@
 
 119. HADOOP-1624.  Fix an infinite loop in datanode. (Raghu Angadi via cutting)
 
+120. HADOOP-1084.  Switch mapred file cache to use file modification
+     time instead of checksum to detect file changes, as checksums are
+     no longer easily accessed.  (Arun C Murthy via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=557790&r1=557789&r2=557790
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Thu Jul
19 15:01:14 2007
@@ -25,8 +25,6 @@
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
 
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.net.URI;
 
 /*******************************************************************************
@@ -37,12 +35,16 @@
 public class DistributedCache {
   // cacheID to cacheStatus mapping
   private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String,
CacheStatus>();
-  // buffer size for reading checksum files
-  private static final int CRC_BUFFER_SIZE = 64 * 1024;
+  
+  // default total cache size
+  private static final long DEFAULT_CACHE_SIZE = 1048576L;
+
   private static final Log LOG =
     LogFactory.getLog(DistributedCache.class);
   
   /**
+   * Get the locally cached file or archive; it could either be 
+   * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
    * @param cache the cache to be localized, this should be specified as 
    * new URI(hdfs://hostname:port/absoulte_path_to_file#LINKNAME). If no schema 
@@ -54,9 +56,8 @@
    *  with a .zip or .jar extension it will be unzipped/unjarred automatically 
    *  and the directory where the archive is unjarred is returned as the Path.
    *  In case of a file, the path to the file is returned
-   * @param md5 this is a mere checksum to verufy if you are using the right cache. 
-   * You need to pass the md5 of the crc file in DFS. This is matched against the one
-   * calculated in this api and if it does not match, the cache is not localized.
+   * @param confFileStamp this is the hdfs file modification timestamp to verify that the

+   * file to be cached hasn't changed since the job started
    * @param currentWorkDir this is the directory where you would want to create symlinks

    * for the locally cached files/archives
    * @return the path to directory where the archives are unjarred in case of archives,
@@ -64,7 +65,8 @@
    * @throws IOException
    */
   public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,
-                                   boolean isArchive, String md5, Path currentWorkDir) throws
IOException {
+                                   boolean isArchive, long confFileStamp, Path currentWorkDir)

+  throws IOException {
     String cacheId = makeRelative(cache, conf);
     CacheStatus lcacheStatus;
     Path localizedPath;
@@ -72,16 +74,13 @@
       lcacheStatus = cachedArchives.get(cacheId);
       if (lcacheStatus == null) {
         // was never localized
-        lcacheStatus = new CacheStatus();
-        lcacheStatus.currentStatus = false;
-        lcacheStatus.refcount = 0;
-        lcacheStatus.localLoadPath = new Path(baseDir, new Path(cacheId));
+        lcacheStatus = new CacheStatus(new Path(baseDir, new Path(cacheId)));
         cachedArchives.put(cacheId, lcacheStatus);
       }
       
       synchronized (lcacheStatus) {
-        localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, 
-                                      md5, currentWorkDir);
+        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
+                                      isArchive, currentWorkDir);
         lcacheStatus.refcount++;
       }
     }
@@ -89,7 +88,7 @@
     // try deleting stuff if you can
     long size = FileUtil.getDU(new File(baseDir.toString()));
     // setting the cache size to a default of 1MB
-    long allowedSize = conf.getLong("local.cache.size", 1048576L);
+    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
     if (allowedSize < size) {
       // try some cache deletions
       deleteCache(conf);
@@ -163,16 +162,18 @@
     return new Path(p, p.getName());
   }
 
-  // the methoed which actually copies the caches locally and unjars/unzips them
-  private static Path localizeCache(URI cache, CacheStatus cacheStatus,
-                                    Configuration conf, boolean isArchive, String md5, Path
currentWorkDir) throws IOException {
-    boolean b = true;
+  // the method which actually copies the caches locally and unjars/unzips them
+  private static Path localizeCache(Configuration conf, 
+                                    URI cache, long confFileStamp,
+                                    CacheStatus cacheStatus,
+                                    boolean isArchive, 
+                                    Path currentWorkDir) 
+  throws IOException {
     boolean doSymlink = getSymlink(conf);
-    FileSystem dfs = FileSystem.get(cache, conf);
-    b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf);
+    FileSystem fs = getFileSystem(cache, conf);
     String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
     File flink = new File(link);
-    if (b) {
+    if (ifExistsAndFresh(conf, fs, cache, confFileStamp, cacheStatus)) {
       if (isArchive) {
         if (doSymlink){
           if (!flink.exists())
@@ -197,7 +198,7 @@
       if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
         throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
                               + " is in use and cannot be refreshed");
-      byte[] checkSum = createMD5(cache, conf);
+      
       FileSystem localFs = FileSystem.getLocal(conf);
       localFs.delete(cacheStatus.localLoadPath);
       Path parchive = new Path(cacheStatus.localLoadPath,
@@ -206,11 +207,9 @@
         throw new IOException("Mkdirs failed to create directory " + 
                               cacheStatus.localLoadPath.toString());
       }
+
       String cacheId = cache.getPath();
-      dfs.copyToLocalFile(new Path(cacheId), parchive);
-      dfs.copyToLocalFile(new Path(cacheId + "_md5"), new Path(parchive
-                                                               .toString()
-                                                               + "_md5"));
+      fs.copyToLocalFile(new Path(cacheId), parchive);
       if (isArchive) {
         String tmpArchive = parchive.toString().toLowerCase();
         if (tmpArchive.endsWith(".jar")) {
@@ -224,10 +223,10 @@
         // else will not do anyhting
         // and copy the file into the dir as it is
       }
-      // create a symlink if #NAME is specified as fragment in the
-      // symlink
+
+      // update cacheStatus to reflect the newly cached file
       cacheStatus.currentStatus = true;
-      cacheStatus.md5 = checkSum;
+      cacheStatus.mtime = getTimestamp(conf, cache);
     }
     
     if (isArchive){
@@ -249,92 +248,45 @@
   }
 
   // Checks if the cache has already been localized and is fresh
-  private static boolean ifExistsAndFresh(CacheStatus lcacheStatus, URI cache,
-                                          FileSystem dfs, String confMD5, Configuration conf)
throws IOException {
-    // compute the md5 of the crc
-    byte[] digest = null;
-    byte[] fsDigest = createMD5(cache, conf);
-    byte[] confDigest = StringUtils.hexStringToByte(confMD5);
+  private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
+                                          URI cache, long confFileStamp, 
+                                          CacheStatus lcacheStatus) 
+  throws IOException {
     // check for existence of the cache
     if (lcacheStatus.currentStatus == false) {
       return false;
     } else {
-      digest = lcacheStatus.md5;
-      if (!MessageDigest.isEqual(confDigest, fsDigest)) {
-        throw new IOException("Inconsistencty in data caching, "
-                              + "Cache archives have been changed");
-      } else {
-        if (!MessageDigest.isEqual(confDigest, digest)) {
-          // needs refreshing
-          return false;
-        } else {
-          // does not need any refreshing
-          return true;
-        }
+      long dfsFileStamp = getTimestamp(conf, cache);
+
+      // ensure that the file on hdfs hasn't been modified since the job started 
+      if (dfsFileStamp != confFileStamp) {
+        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+        throw new IOException("File: " + cache + 
+                              " has changed on HDFS since job started");
+      }
+      
+      if (dfsFileStamp != lcacheStatus.mtime) {
+        // needs refreshing
+        return false;
       }
     }
+    
+    return true;
   }
 
   /**
-   * Returns md5 of the checksum file for a given dfs file.
-   * This method also creates file filename_md5 existence of which
-   * signifies a new cache has been loaded into dfs. So if you want to
-   * refresh the cache, you need to delete this md5 file as well.
-   * @param cache The cache to get the md5 checksum for
+   * Returns mtime of a given cache file on hdfs.
    * @param conf configuration
-   * @return md5 of the crc of the cache parameter
+   * @param cache cache file 
+   * @return mtime of a given cache file on hdfs
    * @throws IOException
    */
-  public static byte[] createMD5(URI cache, Configuration conf)
+  public static long getTimestamp(Configuration conf, URI cache)
     throws IOException {
-    byte[] b = new byte[CRC_BUFFER_SIZE];
-    byte[] digest = null;
-
     FileSystem fileSystem = FileSystem.get(cache, conf);
-    if (!(fileSystem instanceof ChecksumFileSystem)) {
-      throw new IOException("Not a checksummed file system: "
-                            +fileSystem.getUri());
-    }
-    String filename = cache.getPath();
-    Path filePath = new Path(filename);
-    Path md5File = new Path(filePath.getParent().toString() + Path.SEPARATOR
-                            + filePath.getName() + "_md5");
-    MessageDigest md5 = null;
-    try {
-      md5 = MessageDigest.getInstance("MD5");
-    } catch (NoSuchAlgorithmException na) {
-      // do nothing
-    }
-    if (!fileSystem.exists(md5File)) {
-      ChecksumFileSystem checksumFs;
-      if (!(fileSystem instanceof ChecksumFileSystem)) {
-        throw new IOException(
-                              "Not a checksumed file system: "+fileSystem.getUri());
-      } else {
-        checksumFs = (ChecksumFileSystem)fileSystem;
-      }
-      FSDataInputStream fsStream = checksumFs.getRawFileSystem().open(
-                                                                      checksumFs.getChecksumFile(filePath));
-      int read = fsStream.read(b);
-      while (read != -1) {
-        md5.update(b, 0, read);
-        read = fsStream.read(b);
-      }
-      fsStream.close();
-      digest = md5.digest();
-
-      short replication = fileSystem.getReplication(filePath);
-      FSDataOutputStream out = fileSystem.create(md5File, replication);
-      out.write(digest);
-      out.close();
-    } else {
-      FSDataInputStream fsStream = fileSystem.open(md5File);
-      digest = new byte[md5.getDigestLength()];
-      fsStream.readFully(digest);
-      fsStream.close();
-    }
+    Path filePath = new Path(cache.getPath());
 
-    return digest;
+    return fileSystem.getFileStatus(filePath).getModificationTime();
   }
   
   /**
@@ -358,6 +310,26 @@
       }
     }  
   }
+  
+  private static String getFileSysName(URI url) {
+    String fsname = url.getScheme();
+    if ("hdfs".equals(fsname)) {
+      String host = url.getHost();
+      int port = url.getPort();
+      return (port == (-1)) ? host : (host + ":" + port);
+    } else {
+      return null;
+    }
+  }
+  
+  private static FileSystem getFileSystem(URI cache, Configuration conf)
+    throws IOException {
+    String fileSysName = getFileSysName(cache);
+    if (fileSysName != null)
+      return FileSystem.getNamed(fileSysName, conf);
+    else
+      return FileSystem.get(conf);
+  }
 
   /**
    * Set the configuration with the given set of archives
@@ -424,50 +396,50 @@
   }
 
   /**
-   * Get the md5 checksums of the archives
-   * @param conf The configuration which stored the md5's
-   * @return a string array of md5 checksums 
+   * Get the timestamps of the archives
+   * @param conf The configuration which stored the timestamps
+   * @return a string array of timestamps 
    * @throws IOException
    */
-  public static String[] getArchiveMd5(Configuration conf) throws IOException {
-    return conf.getStrings("mapred.cache.archivemd5");
+  public static String[] getArchiveTimestamps(Configuration conf) {
+    return conf.getStrings("mapred.cache.archives.timestamps");
   }
 
 
   /**
-   * Get the md5 checksums of the files
-   * @param conf The configuration which stored the md5's
-   * @return a string array of md5 checksums 
+   * Get the timestamps of the files
+   * @param conf The configuration which stored the timestamps
+   * @return a string array of timestamps 
    * @throws IOException
    */
-  public static String[] getFileMd5(Configuration conf) throws IOException {
-    return conf.getStrings("mapred.cache.filemd5");
+  public static String[] getFileTimestamps(Configuration conf) {
+    return conf.getStrings("mapred.cache.files.timestamps");
   }
 
   /**
-   * This is to check the md5 of the archives to be localized
-   * @param conf Configuration which stores the md5's
-   * @param md5 comma seperated list of md5 checksums of the .crc's of archives.
-   * The order should be the same as the order in which the archives are added
+   * This is to check the timestamp of the archives to be localized
+   * @param conf Configuration which stores the timestamp's
+   * @param timestamps comma separated list of timestamps of archives.
+   * The order should be the same as the order in which the archives are added.
    */
-  public static void setArchiveMd5(Configuration conf, String md5) {
-    conf.set("mapred.cache.archivemd5", md5);
+  public static void setArchiveTimestamps(Configuration conf, String timestamps) {
+    conf.set("mapred.cache.archives.timestamps", timestamps);
   }
 
   /**
-   * This is to check the md5 of the files to be localized
-   * @param conf Configuration which stores the md5's
-   * @param md5 comma seperated list of md5 checksums of the .crc's of files.
-   * The order should be the same as the order in which the files are added
+   * This is to check the timestamp of the files to be localized
+   * @param conf Configuration which stores the timestamp's
+   * @param timestamps comma separated list of timestamps of files.
+   * The order should be the same as the order in which the files are added.
    */
-  public static void setFileMd5(Configuration conf, String md5) {
-    conf.set("mapred.cache.filemd5", md5);
+  public static void setFileTimestamps(Configuration conf, String timestamps) {
+    conf.set("mapred.cache.files.timestamps", timestamps);
   }
   
   /**
    * Set the conf to contain the location for localized archives 
    * @param conf The conf to modify to contain the localized caches
-   * @param str a comma seperated list of local archives
+   * @param str a comma separated list of local archives
    */
   public static void setLocalArchives(Configuration conf, String str) {
     conf.set("mapred.cache.localArchives", str);
@@ -476,7 +448,7 @@
   /**
    * Set the conf to contain the location for localized files 
    * @param conf The conf to modify to contain the localized caches
-   * @param str a comma seperated list of local files
+   * @param str a comma separated list of local files
    */
   public static void setLocalFiles(Configuration conf, String str) {
     conf.set("mapred.cache.localFiles", str);
@@ -648,16 +620,24 @@
 
   private static class CacheStatus {
     // false, not loaded yet, true is loaded
-    public boolean currentStatus;
+    boolean currentStatus;
 
     // the local load path of this cache
-    public Path localLoadPath;
+    Path localLoadPath;
 
     // number of instances using this cache
-    public int refcount;
+    int refcount;
+
+    // the cache-file modification time
+    long mtime;
 
-    // The md5 checksum of the crc file of this cache
-    public byte[] md5;
+    public CacheStatus(Path localLoadPath) {
+      super();
+      this.currentStatus = false;
+      this.localLoadPath = localLoadPath;
+      this.refcount = 0;
+      this.mtime = -1;
+    }
   }
 
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=557790&r1=557789&r2=557790
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Thu Jul 19 15:01:14
2007
@@ -300,33 +300,27 @@
     Path submitJarFile = new Path(submitJobDir, "job.jar");
     Path submitSplitFile = new Path(submitJobDir, "job.split");
         
-    // try getting the md5 of the archives
+    // set the timestamps of the archives and files
     URI[] tarchives = DistributedCache.getCacheArchives(job);
-    URI[] tfiles = DistributedCache.getCacheFiles(job);
-    if ((tarchives != null) || (tfiles != null)) {
-      // prepare these archives for md5 checksums
-      if (tarchives != null) {
-        StringBuffer md5Archives = 
-          new StringBuffer(StringUtils.byteToHexString(DistributedCache.createMD5(tarchives[0],
job)));
-        for (int i = 1; i < tarchives.length; i++) {
-          md5Archives.append(",");
-          md5Archives.append(StringUtils.byteToHexString(DistributedCache
-                                          .createMD5(tarchives[i], job)));
-        }
-        DistributedCache.setArchiveMd5(job, md5Archives.toString());
-        //job.set("mapred.cache.archivemd5", md5Archives);
+    if (tarchives != null) {
+      StringBuffer archiveTimestamps = 
+        new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tarchives[0])));
+      for (int i = 1; i < tarchives.length; i++) {
+        archiveTimestamps.append(",");
+        archiveTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tarchives[i])));
       }
-      if (tfiles != null) {
-        StringBuffer md5Files = 
-          new StringBuffer(StringUtils.byteToHexString(DistributedCache.createMD5(tfiles[0],
job)));
-        for (int i = 1; i < tfiles.length; i++) {
-            md5Files.append(",");
-            md5Files.append(StringUtils.byteToHexString(DistributedCache
-                                          .createMD5(tfiles[i], job)));
-        }
-        DistributedCache.setFileMd5(job, md5Files.toString());
-        //"mapred.cache.filemd5", md5Files);
+      DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
+    }
+
+    URI[] tfiles = DistributedCache.getCacheFiles(job);
+    if (tfiles != null) {
+      StringBuffer fileTimestamps = 
+        new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tfiles[0])));
+      for (int i = 1; i < tfiles.length; i++) {
+        fileTimestamps.append(",");
+        fileTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tfiles[i])));
       }
+      DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
     }
        
     String originalJarPath = job.getJar();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=557790&r1=557789&r2=557790
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Jul 19 15:01:14
2007
@@ -110,20 +110,24 @@
       URI[] files = DistributedCache.getCacheFiles(conf);
       if ((archives != null) || (files != null)) {
         if (archives != null) {
-          String[] md5 = DistributedCache.getArchiveMd5(conf);
+          String[] archivesTimestamps = DistributedCache.getArchiveTimestamps(conf);
           Path[] p = new Path[archives.length];
           for (int i = 0; i < archives.length;i++){
             p[i] = DistributedCache.getLocalCache(archives[i], conf, 
-                                                  conf.getLocalPath(TaskTracker.getCacheSubdir()),
true, md5[i], new Path(workDir.getAbsolutePath()));
+                                                  conf.getLocalPath(TaskTracker.getCacheSubdir()),

+                                                  true, Long.parseLong(archivesTimestamps[i]),

+                                                  new Path(workDir.getAbsolutePath()));
           }
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
         }
         if ((files != null)) {
-          String[] md5 = DistributedCache.getFileMd5(conf);
+          String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
           Path[] p = new Path[files.length];
           for (int i = 0; i < files.length;i++){
-            p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker
-                                                                                    .getCacheSubdir()),
false, md5[i], new Path(workDir.getAbsolutePath()));
+            p[i] = DistributedCache.getLocalCache(files[i], conf, 
+                                                  conf.getLocalPath(TaskTracker.getCacheSubdir()),

+                                                  false, Long.parseLong(fileTimestamps[i]),

+                                                  new Path(workDir.getAbsolutePath()));
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }



Mime
View raw message