hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077124 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/filecache/ test/org/apache/hadoop/filecache/ test/org/apache/hadoop/mapred/
Date Fri, 04 Mar 2011 03:43:25 GMT
Author: omalley
Date: Fri Mar  4 03:43:24 2011
New Revision: 1077124

URL: http://svn.apache.org/viewvc?rev=1077124&view=rev
Log:
commit 61507d10e1103b878f72d7da49dfc08a6b3d61ed
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date:   Sat Jan 23 20:27:58 2010 +0530

    MAPREDUCE:1140 from https://issues.apache.org/jira/secure/attachment/12431213/patch-1140-3-y20.txt
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1140. Fix DistributedCache to not decrement reference counts
    +    for unreferenced files in error conditions.
    +    (Amareshwari Sriramadasu via yhemanth)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java?rev=1077124&r1=1077123&r2=1077124&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
Fri Mar  4 03:43:24 2011
@@ -33,7 +33,6 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -73,6 +72,7 @@ public class TaskDistributedCacheManager
     final long timestamp;
     /** Whether this is to be added to the classpath */
     final boolean shouldBeAddedToClassPath;
+    boolean localized = false;
 
     private CacheFile(URI uri, FileType type, long timestamp, 
         boolean classPath) {
@@ -109,6 +109,14 @@ public class TaskDistributedCacheManager
       }
       return ret;
     }
+    
+    boolean getLocalized() {
+      return localized;
+    }
+    
+    void setLocalized(boolean val) {
+      localized = val;
+    }
   }
 
   TaskDistributedCacheManager(
@@ -157,6 +165,7 @@ public class TaskDistributedCacheManager
           cacheSubdir, fileStatus, 
           cacheFile.type == CacheFile.FileType.ARCHIVE,
           cacheFile.timestamp, workdirPath, false);
+      cacheFile.setLocalized(true);
 
       if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
         localArchives.add(p);
@@ -179,6 +188,13 @@ public class TaskDistributedCacheManager
 
   }
 
+  /*
+   * This method is called from unit tests.
+   */
+  List<CacheFile> getCacheFiles() {
+    return cacheFiles;
+  }
+  
   private static String stringifyPathList(List<Path> p){
     if (p == null || p.isEmpty()) {
       return null;
@@ -210,7 +226,9 @@ public class TaskDistributedCacheManager
    */
   public void release() throws IOException {
     for (CacheFile c : cacheFiles) {
-      distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
+      if (c.getLocalized()) {
+        distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077124&r1=1077123&r2=1077124&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
Fri Mar  4 03:43:24 2011
@@ -129,39 +129,50 @@ public class TrackerDistributedCacheMana
       lcacheStatus.refcount++;
     }
     
-    // do the localization, after releasing the global lock
-    synchronized (lcacheStatus) {
-      if (!lcacheStatus.isInited()) {
-        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
-            fileStatus, isArchive);
-        lcacheStatus.initComplete();
-      } else {
-        localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
-            lcacheStatus, fileStatus, isArchive);
+    boolean initSuccessful = false;
+    try {
+      // do the localization, after releasing the global lock
+      synchronized (lcacheStatus) {
+        if (!lcacheStatus.isInited()) {
+          localizedPath = localizeCache(conf, cache, confFileStamp,
+              lcacheStatus, fileStatus, isArchive);
+          lcacheStatus.initComplete();
+        } else {
+          localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+              lcacheStatus, fileStatus, isArchive);
+        }
+        createSymlink(conf, cache, lcacheStatus, isArchive,
+            currentWorkDir, honorSymLinkConf);
       }
-      createSymlink(conf, cache, lcacheStatus, isArchive,
-          currentWorkDir, honorSymLinkConf);
-    }
 
-    // try deleting stuff if you can
-    long size = 0;
-    synchronized (lcacheStatus) {
-      synchronized (baseDirSize) {
-        Long get = baseDirSize.get(lcacheStatus.getBaseDir());
-        if ( get != null ) {
-         size = get.longValue();
-        } else {
-          LOG.warn("Cannot find size of baseDir: " + lcacheStatus.getBaseDir());
+      // try deleting stuff if you can
+      long size = 0;
+      synchronized (lcacheStatus) {
+        synchronized (baseDirSize) {
+          Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+          if (get != null) {
+            size = get.longValue();
+          } else {
+            LOG.warn("Cannot find size of baseDir: "
+                + lcacheStatus.getBaseDir());
+          }
+        }
+      }
+      // setting the cache size to a default of 10GB
+      long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
+      if (allowedSize < size) {
+        // try some cache deletions
+        deleteCache(conf);
+      }
+      initSuccessful = true;
+      return localizedPath;
+    } finally {
+      if (!initSuccessful) {
+        synchronized (cachedArchives) {
+          lcacheStatus.refcount--;
         }
       }
     }
-    // setting the cache size to a default of 10GB
-    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
-    if (allowedSize < size) {
-      // try some cache deletions
-      deleteCache(conf);
-    }
-    return localizedPath;
   }
 
   /**
@@ -188,6 +199,21 @@ public class TrackerDistributedCacheMana
     }
   }
 
+  /*
+   * This method is called from unit tests. 
+   */
+  int getReferenceCount(URI cache, Configuration conf, long timeStamp) 
+    throws IOException {
+    String key = getKey(cache, conf, timeStamp);
+    synchronized (cachedArchives) {
+      CacheStatus lcacheStatus = cachedArchives.get(key);
+      if (lcacheStatus == null) {
+        throw new IOException("Cannot find localized cache: " + cache);
+      }
+      return lcacheStatus.refcount;
+    }
+  }
+
   // To delete the caches which have a refcount of zero
 
   private void deleteCache(Configuration conf) throws IOException {
@@ -294,7 +320,7 @@ public class TrackerDistributedCacheMana
 
   //the method which actually copies the caches locally and unjars/unzips them
   // and does chmod for the files
-  private Path localizeCache(Configuration conf,
+  Path localizeCache(Configuration conf,
                                       URI cache, long confFileStamp,
                                       CacheStatus cacheStatus,
                                       FileStatus fileStatus,
@@ -432,7 +458,7 @@ public class TrackerDistributedCacheMana
     }
   }
 
-  private static class CacheStatus {
+  static class CacheStatus {
     // the local load path of this cache
     Path localLoadPath;
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077124&r1=1077123&r2=1077124&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Fri Mar  4 03:43:24 2011
@@ -22,18 +22,23 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Random;
 
 import javax.security.auth.login.LoginException;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.DefaultTaskController;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskController.InitializationContext;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,7 +50,10 @@ import org.apache.hadoop.filecache.TaskD
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.security.UserGroupInformation;
 
+
 public class TestTrackerDistributedCacheManager extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestTrackerDistributedCacheManager.class);
 
   protected String TEST_ROOT_DIR =
       new File(System.getProperty("test.build.data", "/tmp"),
@@ -94,12 +102,24 @@ public class TestTrackerDistributedCache
   }
 
   /**
+   * Whether the test can run on the machine
+   * 
+   * @return true if test can run on the machine, false otherwise
+   */
+  protected boolean canRun() {
+    return true;
+  }
+  
+  /**
    * This is the typical flow for using the DistributedCache classes.
    * 
    * @throws IOException
    * @throws LoginException
    */
   public void testManagerFlow() throws IOException, LoginException {
+    if (!canRun()) {
+      return;
+    }
 
     // ****** Imitate JobClient code
     // Configures a task/job with both a regular file and a "classpath" file.
@@ -153,6 +173,101 @@ public class TestTrackerDistributedCache
   }
 
   /**
+   * This DistributedCacheManager fails in localizing firstCacheFile.
+   */
+  public class FakeTrackerDistributedCacheManager extends
+      TrackerDistributedCacheManager {
+    public FakeTrackerDistributedCacheManager(Configuration conf)
+        throws IOException {
+      super(conf);
+    }
+
+    @Override
+    Path localizeCache(Configuration conf, URI cache, long confFileStamp,
+        CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive)
+        throws IOException {
+      if (cache.equals(firstCacheFile.toUri())) {
+        throw new IOException("fake fail");
+      }
+      return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
+          fileStatus, isArchive);
+    }
+  }
+
+  public void testReferenceCount() throws IOException, LoginException,
+      URISyntaxException {
+    if (!canRun()) {
+      return;
+    }
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+    TrackerDistributedCacheManager manager = 
+      new FakeTrackerDistributedCacheManager(conf);
+
+    String userName = getJobOwnerName();
+    File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
+
+    // Configures a job with a regular file
+    Job job1 = new Job(conf);
+    Configuration conf1 = job1.getConfiguration();
+    DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
+    
+    TrackerDistributedCacheManager.determineTimestamps(conf1);
+
+    // Task localizing for first job
+    TaskDistributedCacheManager handle = manager
+        .newTaskDistributedCacheManager(conf1);
+    handle.setup(localDirAllocator, workDir, TaskTracker
+          .getDistributedCacheDir(userName));
+    handle.release();
+    for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
+      assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
+    }
+    
+    Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
+    createTempFile(thirdCacheFile);
+    
+    // Configures another job with three regular files.
+    Job job2 = new Job(conf);
+    Configuration conf2 = job2.getConfiguration();
+    // add a file that would get failed to localize
+    DistributedCache.addCacheFile(firstCacheFile.toUri(), conf2);
+    // add a file that is already localized by different job
+    DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
+    // add a file that is never localized
+    DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
+    
+    TrackerDistributedCacheManager.determineTimestamps(conf2);
+
+    // Task localizing for second job
+    // localization for the "firstCacheFile" will fail.
+    handle = manager.newTaskDistributedCacheManager(conf2);
+    Throwable th = null;
+    try {
+      handle.setup(localDirAllocator, workDir, TaskTracker
+          .getDistributedCacheDir(userName));
+    } catch (IOException e) {
+      th = e;
+      LOG.info("Exception during setup", e);
+    }
+    assertNotNull(th);
+    assertTrue(th.getMessage().contains("fake fail"));
+    handle.release();
+    th = null;
+    for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
+      try {
+        assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp));
+      } catch (IOException ie) {
+        th = ie;
+        LOG.info("Exception getting reference count for " + c.uri, ie);
+      }
+    }
+    assertNotNull(th);
+    assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
+    fs.delete(thirdCacheFile, false);
+  }
+
+  /**
    * Check proper permissions on the cache files
    * 
    * @param localCacheFiles
@@ -180,6 +295,9 @@ public class TestTrackerDistributedCache
 
   /** test delete cache */
   public void testDeleteCache() throws Exception {
+    if (!canRun()) {
+      return;
+    }
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf);
     FileSystem localfs = FileSystem.getLocal(conf);
@@ -204,6 +322,9 @@ public class TestTrackerDistributedCache
   }
   
   public void testFileSystemOtherThanDefault() throws Exception {
+    if (!canRun()) {
+      return;
+    }
     TrackerDistributedCacheManager manager =
       new TrackerDistributedCacheManager(conf);
     conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
@@ -262,6 +383,9 @@ public class TestTrackerDistributedCache
   }
   
   public void testFreshness() throws Exception {
+    if (!canRun()) {
+      return;
+    }
     Configuration myConf = new Configuration(conf);
     myConf.set("fs.default.name", "refresh:///");
     myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1077124&r1=1077123&r2=1077124&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
Fri Mar  4 03:43:24 2011
@@ -21,8 +21,6 @@ package org.apache.hadoop.mapred;
 import java.io.File;
 import java.io.IOException;
 
-import javax.security.auth.login.LoginException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -87,19 +85,9 @@ public class TestTrackerDistributedCache
     super.tearDown();
   }
 
-  /**
-   * Test the control flow of distributed cache manager when LinuxTaskController
-   * is used.
-   */
   @Override
-  public void testManagerFlow()
-      throws IOException,
-      LoginException {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-
-    super.testManagerFlow();
+  protected boolean canRun() {
+    return ClusterWithLinuxTaskController.shouldRun();
   }
 
   @Override
@@ -165,30 +153,4 @@ public class TestTrackerDistributedCache
       path = path.getParentFile();
     }
   }
-
-  @Override
-  public void testDeleteCache()
-      throws Exception {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-    super.testDeleteCache();
-  }
-
-  @Override
-  public void testFileSystemOtherThanDefault()
-      throws Exception {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-    super.testFileSystemOtherThanDefault();
-  }
-  
-  @Override
-  public void testFreshness()  throws Exception { 
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-    super.testFreshness();
-  }
 }



Mime
View raw message