hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077703 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/filecache/ test/org/apache/hadoop/filecache/
Date Fri, 04 Mar 2011 04:46:15 GMT
Author: omalley
Date: Fri Mar  4 04:46:14 2011
New Revision: 1077703

URL: http://svn.apache.org/viewvc?rev=1077703&view=rev
Log:
commit d0d71d53f16f0db8d6768aa8d0dff74ce104bb29
Author: Owen O'Malley <omalley@apache.org>
Date:   Sun Sep 19 00:39:01 2010 -0700

    add a link from CacheFiles to the CacheStatus

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java?rev=1077703&r1=1077702&r2=1077703&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
Fri Mar  4 04:46:14 2011
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager.CacheStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -72,9 +73,10 @@ public class TaskDistributedCacheManager
     boolean localized = false;
     /** The owner of the localized file. Relevant only on the tasktrackers */
     final String owner;
+    private CacheStatus status;
 
-    private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp, 
-        boolean classPath) throws IOException {
+    CacheFile(URI uri, FileType type, boolean isPublic, long timestamp, 
+                      boolean classPath) throws IOException {
       this.uri = uri;
       this.type = type;
       this.isPublic = isPublic;
@@ -85,6 +87,21 @@ public class TaskDistributedCacheManager
     }
 
     /**
+     * Set the status for this cache file.
+     * @param status
+     */
+    public void setStatus(CacheStatus status) {
+      this.status = status;
+    }
+    
+    /**
+     * Get the status for this cache file.
+     * @return the status object
+     */
+    public CacheStatus getStatus() {
+      return status;
+    }
+    /**
      * Converts the scheme used by DistributedCache to serialize what files to
      * cache in the configuration into CacheFile objects that represent those 
      * files.
@@ -167,12 +184,12 @@ public class TaskDistributedCacheManager
         p = distributedCacheManager.getLocalCache(uri, taskConf,
             publicCacheSubdir, fileStatus, 
             cacheFile.type == CacheFile.FileType.ARCHIVE,
-            cacheFile.timestamp, cacheFile.isPublic);
+            cacheFile.timestamp, cacheFile.isPublic, cacheFile);
       } else {
         p = distributedCacheManager.getLocalCache(uri, taskConf,
             privateCacheSubdir, fileStatus, 
             cacheFile.type == CacheFile.FileType.ARCHIVE,
-            cacheFile.timestamp, cacheFile.isPublic);
+            cacheFile.timestamp, cacheFile.isPublic, cacheFile);
       }
       cacheFile.setLocalized(true);
 
@@ -257,9 +274,8 @@ public class TaskDistributedCacheManager
    */
   public void release() throws IOException {
     for (CacheFile c : cacheFiles) {
-      if (c.getLocalized()) {
-        distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp, 
-            c.owner);
+      if (c.getLocalized() && c.status != null) {
+        distributedCacheManager.releaseCache(c.status);
       }
     }
   }
@@ -267,9 +283,8 @@ public class TaskDistributedCacheManager
   public void setSizes(long[] sizes) throws IOException {
     int i = 0;
     for (CacheFile c: cacheFiles) {
-      if (!c.isPublic) {
-        distributedCacheManager.setSize(c.uri, taskConf, c.timestamp, c.owner, 
-                                        sizes[i++]);
+      if (!c.isPublic && c.status != null) {
+        distributedCacheManager.setSize(c.status, sizes[i++]);
       }
     }
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077703&r1=1077702&r2=1077703&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
Fri Mar  4 04:46:14 2011
@@ -36,6 +36,7 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager.CacheFile;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -139,10 +140,9 @@ public class TrackerDistributedCacheMana
    * @throws IOException
    */
   Path getLocalCache(URI cache, Configuration conf,
-      String subDir, FileStatus fileStatus,
-      boolean isArchive, long confFileStamp,
-      boolean isPublic)
-      throws IOException {
+                     String subDir, FileStatus fileStatus,
+                     boolean isArchive, long confFileStamp,
+                     boolean isPublic, CacheFile file) throws IOException {
     String key;
     String user = getLocalizedCacheOwner(isPublic);
     key = getKey(cache, conf, confFileStamp, user);
@@ -168,8 +168,11 @@ public class TrackerDistributedCacheMana
         cachedArchives.put(key, lcacheStatus);
       }
 
-      //mark the cache for use. 
-      lcacheStatus.refcount++;
+      //mark the cache for use.
+      file.setStatus(lcacheStatus);
+      synchronized (lcacheStatus) {
+        lcacheStatus.refcount++;
+      }
     }
     
     boolean initSuccessful = false;
@@ -234,50 +237,26 @@ public class TrackerDistributedCacheMana
    * @param owner the owner of the localized file
    * @throws IOException
    */
-  void releaseCache(URI cache, Configuration conf, long timeStamp,
-      String owner) throws IOException {
-    String key = getKey(cache, conf, timeStamp, owner);
-    synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(key);
-      if (lcacheStatus == null) {
-        LOG.warn("Cannot find localized cache: " + cache + 
-                 " (key: " + key + ") in releaseCache!");
-        return;
-      }
-      
-      // decrement ref count 
-      lcacheStatus.refcount--;
+  void releaseCache(CacheStatus status) throws IOException {
+	synchronized (status) {
+      status.refcount--;
     }
   }
 
-  void setSize(URI cache, Configuration conf, long timeStamp,
-               String owner, long size) throws IOException {
-    String key = getKey(cache, conf, timeStamp, owner);
-    synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(key);
-      if (lcacheStatus == null) {
-        LOG.warn("Cannot find localized cache: " + cache + 
-                 " (key: " + key + ") in setSize!");
-        return;
-      }
-      lcacheStatus.size = size;
-      addCacheInfoUpdate(lcacheStatus);
+  void setSize(CacheStatus status, long size) throws IOException {
+    synchronized (status) {
+      status.size = size;
+      addCacheInfoUpdate(status);
     }
   }
 
   /*
    * This method is called from unit tests. 
    */
-  int getReferenceCount(URI cache, Configuration conf, long timeStamp,
-      String owner) throws IOException {
-    String key = getKey(cache, conf, timeStamp, owner);
-    synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(key);
-      if (lcacheStatus == null) {
-        throw new IOException("Cannot find localized cache: " + cache);
-      }
-      return lcacheStatus.refcount;
-    }
+  int getReferenceCount(CacheStatus status) throws IOException {
+	synchronized (status) {
+	  return status.refcount;
+	}
   }
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077703&r1=1077702&r2=1077703&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Fri Mar  4 04:46:14 2011
@@ -33,6 +33,7 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager.CacheFile;
 import org.apache.hadoop.mapred.DefaultTaskController;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.JobLocalizer;
@@ -255,8 +256,7 @@ public class TestTrackerDistributedCache
     JobLocalizer.downloadPrivateCache(conf1);
     handle.release();
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
-      assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp, 
-          c.owner));
+      assertEquals(0, manager.getReferenceCount(c.getStatus()));
     }
     
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -294,17 +294,15 @@ public class TestTrackerDistributedCache
     th = null;
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       try {
-        int refcount = manager.getReferenceCount(c.uri, conf2, c.timestamp, 
-                                                 c.owner);
+        int refcount = manager.getReferenceCount(c.getStatus());
         LOG.info("checking refcount " + c.uri + " of " + refcount);
         assertEquals(0, refcount);
-      } catch (IOException ie) {
+      } catch (NullPointerException ie) {
         th = ie;
         LOG.info("Exception getting reference count for " + c.uri, ie);
       }
     }
     assertNotNull(th);
-    assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
     fs.delete(thirdCacheFile, false);
   }
   
@@ -417,8 +415,8 @@ public class TestTrackerDistributedCache
     }
     Path localizedPath =
       manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
-          fs.getFileStatus(cacheFile), false,
-          c.timestamp, visibility);
+                            fs.getFileStatus(cacheFile), false,
+    		                c.timestamp, visibility, c);
     assertTrue("Cache file didn't get localized in the expected directory. " +
         "Expected localization to happen within " + 
         ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
@@ -530,21 +528,31 @@ public class TestTrackerDistributedCache
     conf2.set("user.name", userName);
 
     // We first test the size limit
+    FileStatus stat = fs.getFileStatus(firstCacheFilePublic);
+    CacheFile cfile1 = new CacheFile(firstCacheFilePublic.toUri(), 
+    		                         CacheFile.FileType.REGULAR, true, 
+    		                         stat.getModificationTime(),
+    		                         true); 
     Path firstLocalCache = manager.getLocalCache(firstCacheFilePublic.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFilePublic), false,
-        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true);
-    manager.releaseCache(firstCacheFilePublic.toUri(), conf2, 
-        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), 
-        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
+        cfile1);
+    manager.releaseCache(cfile1.getStatus());
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
     //sweep away the first cache.
+    stat = fs.getFileStatus(secondCacheFilePublic);
+    CacheFile cfile2 = new CacheFile(secondCacheFilePublic.toUri(), 
+    		                         CacheFile.FileType.REGULAR, true, 
+    		                         stat.getModificationTime(),
+    		                         true); 
     Path secondLocalCache = manager.getLocalCache(secondCacheFilePublic.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(secondCacheFilePublic), false, 
-        fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true);
+        fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
+        cfile2);
     assertFalse("DistributedCache failed deleting old" + 
         " cache when the cache store is full.",
         localfs.exists(firstLocalCache));
@@ -583,21 +591,29 @@ public class TestTrackerDistributedCache
     // limit but does not trigger the file size limit.
     createPublicTempFile(thirdCacheFile);
     createPublicTempFile(fourthCacheFile);
+    stat = fs.getFileStatus(thirdCacheFile);
+    CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(), 
+            CacheFile.FileType.REGULAR, false, 
+            stat.getModificationTime(),
+            true); 
     Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(thirdCacheFile), false,
         fs.getFileStatus(thirdCacheFile).getModificationTime(), 
-        true);
+        true, cfile3);
     // Release the third cache so that it can be deleted while sweeping
-    manager.releaseCache(thirdCacheFile.toUri(), conf2, 
-        fs.getFileStatus(thirdCacheFile).getModificationTime(), 
-        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+    manager.releaseCache(cfile3.getStatus());
     // Getting the fourth cache will make the number of sub directories becomes
     // 3 which is greater than 2. So the released cache will be deleted.
+    stat = fs.getFileStatus(fourthCacheFile);
+    CacheFile cfile4 = new CacheFile(fourthCacheFile.toUri(), 
+            CacheFile.FileType.REGULAR, false, 
+            stat.getModificationTime(),
+            true); 
     Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(fourthCacheFile), false, 
-        fs.getFileStatus(fourthCacheFile).getModificationTime(), true);
+        fs.getFileStatus(fourthCacheFile).getModificationTime(), true, cfile4);
     assertFalse("DistributedCache failed deleting old" + 
         " cache when the cache exceeds the number of sub directories limit.",
         localfs.exists(thirdLocalCache));
@@ -624,11 +640,14 @@ public class TestTrackerDistributedCache
     conf.set("user.name", userName);
     Path fileToCache = new Path("fakefile:///"
         + firstCacheFile.toUri().getPath());
+    CacheFile file = new CacheFile(fileToCache.toUri(), 
+    		                       CacheFile.FileType.REGULAR, 
+    		                       false, 0, false);
     Path result = manager.getLocalCache(fileToCache.toUri(), conf,
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFile), false,
         System.currentTimeMillis(),
-        false);
+        false, file);
     assertNotNull("DistributedCache cached file on non-default filesystem.",
         result);
   }
@@ -802,21 +821,31 @@ public class TestTrackerDistributedCache
     long now = System.currentTimeMillis();
 
     Path[] localCache = new Path[2];
+    FileStatus stat = fs.getFileStatus(firstCacheFile);
+    CacheFile file = new CacheFile(firstCacheFilePublic.toUri(), 
+    		                       CacheFile.FileType.REGULAR, true, 
+    		                       stat.getModificationTime(), false);
     localCache[0] = manager.getLocalCache(firstCacheFilePublic.toUri(), conf, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFilePublic), false,
-        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true);
+        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
+        file);
     FsPermission myPermission = new FsPermission((short)0600);
     Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
     if (FileSystem.create(localfs, myFile, myPermission) == null) {
       throw new IOException("Could not create " + myFile);
     }
     try {
+      stat = fs.getFileStatus(secondCacheFilePublic);
+      file = new CacheFile(secondCacheFilePublic.toUri(),
+    		               CacheFile.FileType.REGULAR,
+    		               true, stat.getModificationTime(), false);
       localCache[1] = manager.getLocalCache(secondCacheFilePublic.toUri(), conf, 
           TaskTracker.getPrivateDistributedCacheDir(userName),
           fs.getFileStatus(secondCacheFilePublic), false, 
-          fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true);
-      FileStatus stat = localfs.getFileStatus(myFile);
+          fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
+          file);
+      stat = localfs.getFileStatus(myFile);
       assertTrue(stat.getPermission().equals(myPermission));
       // validate permissions of localized files.
       checkFilePermissions(localCache);



Mime
View raw message