hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r885373 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/filecache/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/filecache/
Date Mon, 30 Nov 2009 11:00:40 GMT
Author: yhemanth
Date: Mon Nov 30 11:00:40 2009
New Revision: 885373

URL: http://svn.apache.org/viewvc?rev=885373&view=rev
Log:
MAPREDUCE-1140. Fix DistributedCache to not decrement reference counts for unreferenced files
in error conditions. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=885373&r1=885372&r2=885373&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Nov 30 11:00:40 2009
@@ -94,6 +94,10 @@
    
     MAPREDUCE-1244. Fix eclipse-plugin's build dependencies. (gkesavan)
 
+    MAPREDUCE-1140. Fix DistributedCache to not decrement reference counts for
+    unreferenced files in error conditions.
+    (Amareshwari Sriramadasu via yhemanth)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=885373&r1=885372&r2=885373&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
Mon Nov 30 11:00:40 2009
@@ -32,7 +32,6 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -72,6 +71,7 @@
     final long timestamp;
     /** Whether this is to be added to the classpath */
     final boolean shouldBeAddedToClassPath;
+    boolean localized = false;
 
     private CacheFile(URI uri, FileType type, long timestamp, 
         boolean classPath) {
@@ -108,6 +108,14 @@
       }
       return ret;
     }
+    
+    boolean getLocalized() {
+      return localized;
+    }
+    
+    void setLocalized(boolean val) {
+      localized = val;
+    }
   }
 
   TaskDistributedCacheManager(
@@ -156,6 +164,7 @@
           cacheSubdir, fileStatus, 
           cacheFile.type == CacheFile.FileType.ARCHIVE,
           cacheFile.timestamp, workdirPath, false);
+      cacheFile.setLocalized(true);
 
       if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
         localArchives.add(p);
@@ -179,6 +188,13 @@
 
   }
 
+  /*
+   * This method is called from unit tests.
+   */
+  List<CacheFile> getCacheFiles() {
+    return cacheFiles;
+  }
+  
   private static String stringifyPathList(List<Path> p){
     if (p == null || p.isEmpty()) {
       return null;
@@ -210,7 +226,9 @@
    */
   public void release() throws IOException {
     for (CacheFile c : cacheFiles) {
-      distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
+      if (c.getLocalized()) {
+        distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=885373&r1=885372&r2=885373&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
Mon Nov 30 11:00:40 2009
@@ -131,39 +131,51 @@
       lcacheStatus.refcount++;
     }
     
-    // do the localization, after releasing the global lock
-    synchronized (lcacheStatus) {
-      if (!lcacheStatus.isInited()) {
-        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
-            fileStatus, isArchive);
-        lcacheStatus.initComplete();
-      } else {
-        localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
-            lcacheStatus, fileStatus, isArchive);
+    boolean initSuccessful = false;
+    try {
+      // do the localization, after releasing the global lock
+      synchronized (lcacheStatus) {
+        if (!lcacheStatus.isInited()) {
+          localizedPath = localizeCache(conf, cache, confFileStamp,
+              lcacheStatus, fileStatus, isArchive);
+          lcacheStatus.initComplete();
+        } else {
+          localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+              lcacheStatus, fileStatus, isArchive);
+        }
+        createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir,
+            honorSymLinkConf);
       }
-      createSymlink(conf, cache, lcacheStatus, isArchive,
-          currentWorkDir, honorSymLinkConf);
-    }
 
-    // try deleting stuff if you can
-    long size = 0;
-    synchronized (lcacheStatus) {
-      synchronized (baseDirSize) {
-        Long get = baseDirSize.get(lcacheStatus.getBaseDir());
-        if ( get != null ) {
-         size = get.longValue();
-        } else {
-          LOG.warn("Cannot find size of baseDir: " + lcacheStatus.getBaseDir());
+      // try deleting stuff if you can
+      long size = 0;
+      synchronized (lcacheStatus) {
+        synchronized (baseDirSize) {
+          Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+          if (get != null) {
+            size = get.longValue();
+          } else {
+            LOG.warn("Cannot find size of baseDir: "
+                + lcacheStatus.getBaseDir());
+          }
+        }
+      }
+      // setting the cache size to a default of 10GB
+      long allowedSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE,
+          DEFAULT_CACHE_SIZE);
+      if (allowedSize < size) {
+        // try some cache deletions
+        deleteCache(conf);
+      }
+      initSuccessful = true;
+      return localizedPath;
+    } finally {
+      if (!initSuccessful) {
+        synchronized (cachedArchives) {
+          lcacheStatus.refcount--;
         }
       }
     }
-    // setting the cache size to a default of 10GB
-    long allowedSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE, DEFAULT_CACHE_SIZE);
-    if (allowedSize < size) {
-      // try some cache deletions
-      deleteCache(conf);
-    }
-    return localizedPath;
   }
 
   /**
@@ -190,6 +202,21 @@
     }
   }
 
+  /*
+   * This method is called from unit tests. 
+   */
+  int getReferenceCount(URI cache, Configuration conf, long timeStamp) 
+    throws IOException {
+    String key = getKey(cache, conf, timeStamp);
+    synchronized (cachedArchives) {
+      CacheStatus lcacheStatus = cachedArchives.get(key);
+      if (lcacheStatus == null) {
+        throw new IOException("Cannot find localized cache: " + cache);
+      }
+      return lcacheStatus.refcount;
+    }
+  }
+
   // To delete the caches which have a refcount of zero
 
   private void deleteCache(Configuration conf) throws IOException {
@@ -317,7 +344,7 @@
   
   // the method which actually copies the caches locally and unjars/unzips them
   // and does chmod for the files
-  private Path localizeCache(Configuration conf,
+  Path localizeCache(Configuration conf,
                                     URI cache, long confFileStamp,
                                     CacheStatus cacheStatus,
                                     FileStatus fileStatus,
@@ -456,7 +483,7 @@
     }
   }
 
-  private static class CacheStatus {
+  static class CacheStatus {
     // the local load path of this cache
     Path localLoadPath;
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=885373&r1=885372&r2=885373&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
Mon Nov 30 11:00:40 2009
@@ -21,8 +21,6 @@
 import java.io.File;
 import java.io.IOException;
 
-import javax.security.auth.login.LoginException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -87,19 +85,9 @@
     super.tearDown();
   }
 
-  /**
-   * Test the control flow of distributed cache manager when LinuxTaskController
-   * is used.
-   */
   @Override
-  public void testManagerFlow()
-      throws IOException,
-      LoginException {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-
-    super.testManagerFlow();
+  protected boolean canRun() {
+    return ClusterWithLinuxTaskController.shouldRun();
   }
 
   @Override
@@ -165,30 +153,4 @@
       path = path.getParentFile();
     }
   }
-
-  @Override
-  public void testDeleteCache()
-      throws Exception {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-    super.testDeleteCache();
-  }
-
-  @Override
-  public void testFileSystemOtherThanDefault()
-      throws Exception {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-    super.testFileSystemOtherThanDefault();
-  }
-  
-  @Override
-  public void testFreshness()  throws Exception { 
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-    super.testFreshness();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=885373&r1=885372&r2=885373&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Mon Nov 30 11:00:40 2009
@@ -22,6 +22,8 @@
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Random;
 
 import javax.security.auth.login.LoginException;
@@ -34,6 +36,8 @@
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskController.InitializationContext;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +49,7 @@
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.mortbay.log.Log;
 
 public class TestTrackerDistributedCacheManager extends TestCase {
 
@@ -94,12 +99,24 @@
   }
 
   /**
+   * Whether the test can run on the machine
+   * 
+   * @return true if test can run on the machine, false otherwise
+   */
+  protected boolean canRun() {
+    return true;
+  }
+  
+  /**
    * This is the typical flow for using the DistributedCache classes.
    * 
    * @throws IOException
    * @throws LoginException
    */
   public void testManagerFlow() throws IOException, LoginException {
+    if (!canRun()) {
+      return;
+    }
 
     // ****** Imitate JobClient code
     // Configures a task/job with both a regular file and a "classpath" file.
@@ -153,6 +170,99 @@
   }
 
   /**
+   * This DistributedCacheManager fails in localizing firstCacheFile.
+   */
+  public class FakeTrackerDistributedCacheManager extends
+      TrackerDistributedCacheManager {
+    public FakeTrackerDistributedCacheManager(Configuration conf)
+        throws IOException {
+      super(conf);
+    }
+
+    @Override
+    Path localizeCache(Configuration conf, URI cache, long confFileStamp,
+        CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive)
+        throws IOException {
+      if (cache.equals(firstCacheFile.toUri())) {
+        throw new IOException("fake fail");
+      }
+      return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
+          fileStatus, isArchive);
+    }
+  }
+
+  public void testReferenceCount() throws IOException, LoginException,
+      URISyntaxException {
+    if (!canRun()) {
+      return;
+    }
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+    TrackerDistributedCacheManager manager = 
+      new FakeTrackerDistributedCacheManager(conf);
+    Cluster cluster = new Cluster(conf);
+    String userName = getJobOwnerName();
+    File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
+
+    // Configures a job with a regular file
+    Job job1 = Job.getInstance(cluster, conf);
+    job1.addCacheFile(secondCacheFile.toUri());
+    Configuration conf1 = job1.getConfiguration();
+    TrackerDistributedCacheManager.determineTimestamps(conf1);
+
+    // Task localizing for first job
+    TaskDistributedCacheManager handle = manager
+        .newTaskDistributedCacheManager(conf1);
+    handle.setup(localDirAllocator, workDir, TaskTracker
+          .getDistributedCacheDir(userName));
+    handle.release();
+    for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
+      assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
+    }
+    
+    Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
+    createTempFile(thirdCacheFile);
+    
+    // Configures another job with three regular files.
+    Job job2 = Job.getInstance(cluster, conf);
+    // add a file that would get failed to localize
+    job2.addCacheFile(firstCacheFile.toUri());
+    // add a file that is already localized by different job
+    job2.addCacheFile(secondCacheFile.toUri());
+    // add a file that is never localized
+    job2.addCacheFile(thirdCacheFile.toUri());
+    Configuration conf2 = job2.getConfiguration();
+    TrackerDistributedCacheManager.determineTimestamps(conf2);
+
+    // Task localizing for second job
+    // localization for the "firstCacheFile" will fail.
+    handle = manager.newTaskDistributedCacheManager(conf2);
+    Throwable th = null;
+    try {
+      handle.setup(localDirAllocator, workDir, TaskTracker
+          .getDistributedCacheDir(userName));
+    } catch (IOException e) {
+      th = e;
+      Log.info("Exception during setup", e);
+    }
+    assertNotNull(th);
+    assertTrue(th.getMessage().contains("fake fail"));
+    handle.release();
+    th = null;
+    for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
+      try {
+        assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp));
+      } catch (IOException ie) {
+        th = ie;
+        Log.info("Exception getting reference count for " + c.uri, ie);
+      }
+    }
+    assertNotNull(th);
+    assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
+    fs.delete(thirdCacheFile, false);
+  }
+
+  /**
    * Check proper permissions on the cache files
    * 
    * @param localCacheFiles
@@ -180,6 +290,9 @@
 
   /** test delete cache */
   public void testDeleteCache() throws Exception {
+    if (!canRun()) {
+      return;
+    }
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf);
     FileSystem localfs = FileSystem.getLocal(conf);
@@ -204,6 +317,9 @@
   }
   
   public void testFileSystemOtherThanDefault() throws Exception {
+    if (!canRun()) {
+      return;
+    }
     TrackerDistributedCacheManager manager =
       new TrackerDistributedCacheManager(conf);
     conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
@@ -262,6 +378,9 @@
   }
   
   public void testFreshness() throws Exception {
+    if (!canRun()) {
+      return;
+    }
     Configuration myConf = new Configuration(conf);
     myConf.set("fs.default.name", "refresh:///");
     myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);



Mime
View raw message