hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077064 - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/filecache/DistributedCache.java mapred/org/apache/hadoop/mapred/TaskRunner.java
Date Fri, 04 Mar 2011 03:36:57 GMT
Author: omalley
Date: Fri Mar  4 03:36:57 2011
New Revision: 1077064

URL: http://svn.apache.org/viewvc?rev=1077064&view=rev
Log:
commit f2194ab0064e82e4d029763760d7af4680cee9f6
Author: Hemanth Yamijala <yhemanth@yahoo-inc.com>
Date:   Tue Dec 1 09:15:25 2009 +0530

    MAPREDUCE:1140 from https://issues.apache.org/jira/secure/attachment/12426383/patch-1140-2-ydist.txt
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1140. Fix DistributedCache to not decrement reference counts for
    +    unreferenced files in error conditions.
    +    (Amareshwari Sriramadasu via yhemanth)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=1077064&r1=1077063&r2=1077064&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
Fri Mar  4 03:36:57 2011
@@ -216,38 +216,49 @@ public class DistributedCache {
       lcacheStatus.refcount++;
     }
     
-    synchronized (lcacheStatus) {
-      if (!lcacheStatus.isInited()) {
-        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
-          fileStatus, isArchive);
-        lcacheStatus.initComplete();
-      } else {
-        localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
-          lcacheStatus, fileStatus, isArchive);
-      }
-      createSymlink(conf, cache, lcacheStatus, isArchive,
-        currentWorkDir, honorSymLinkConf);
-    }
- 
-    // try deleting stuff if you can
-    long size = 0;
-    synchronized (lcacheStatus) {
-      synchronized (baseDirSize) {
-        Long get = baseDirSize.get(lcacheStatus.getBaseDir());
-        if ( get != null ) {
-    	    size = get.longValue();
+    boolean initSuccessful = false;
+    try {
+      synchronized (lcacheStatus) {
+        if (!lcacheStatus.isInited()) {
+          localizedPath = localizeCache(conf, cache, confFileStamp,
+              lcacheStatus, fileStatus, isArchive);
+          lcacheStatus.initComplete();
         } else {
-          LOG.warn("Cannot find size of baseDir: " + lcacheStatus.getBaseDir());
+          localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+              lcacheStatus, fileStatus, isArchive);
+        }
+        createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir,
+            honorSymLinkConf);
+      }
+
+      // try deleting stuff if you can
+      long size = 0;
+      synchronized (lcacheStatus) {
+        synchronized (baseDirSize) {
+          Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+          if (get != null) {
+            size = get.longValue();
+          } else {
+            LOG.warn("Cannot find size of baseDir: "
+                + lcacheStatus.getBaseDir());
+          }
+        }
+      }
+      // setting the cache size to a default of 10GB
+      long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
+      if (allowedSize < size) {
+        // try some cache deletions
+        deleteCache(conf);
+      }
+      initSuccessful = true;
+      return localizedPath;
+    } finally {
+      if (!initSuccessful) {
+        synchronized (cachedArchives) {
+          lcacheStatus.refcount--;
         }
       }
     }
-    // setting the cache size to a default of 10GB
-    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
-    if (allowedSize < size) {
-      // try some cache deletions
-      deleteCache(conf);
-    }
-    return localizedPath;
   }
 
   

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077064&r1=1077063&r2=1077064&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
Fri Mar  4 03:36:57 2011
@@ -140,9 +140,19 @@ abstract class TaskRunner extends Thread
     return jobConf.get(JobConf.MAPRED_TASK_ENV);
   }
   
+  private static class CacheFile {
+    URI uri;
+    long timeStamp;
+    CacheFile (URI uri, long timeStamp) {
+      this.uri = uri;
+      this.timeStamp = timeStamp;
+    }
+  }
+  
   @Override
   public final void run() {
     String errorInfo = "Child Error";
+    List<CacheFile> localizedCacheFiles = new ArrayList<CacheFile>();
     try {
       
       //before preparing the job localize 
@@ -187,6 +197,8 @@ abstract class TaskRunner extends Thread
                                                         getAbsolutePath()), 
                                                   false,
                                                   lDirAlloc);
+            localizedCacheFiles.add(new CacheFile(archives[i], Long
+                .parseLong(archivesTimestamps[i])));
             
           }
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
@@ -207,6 +219,8 @@ abstract class TaskRunner extends Thread
                                                         getAbsolutePath()), 
                                                   false,
                                                   lDirAlloc);
+            localizedCacheFiles.add(new CacheFile(files[i], Long
+                .parseLong(fileTimestamps[i])));
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }
@@ -523,22 +537,8 @@ abstract class TaskRunner extends Thread
       }
     } finally {
       try{
-        URI[] archives = DistributedCache.getCacheArchives(conf);
-        URI[] files = DistributedCache.getCacheFiles(conf);
-        String[] archivesTimestamps = 
-          DistributedCache.getArchiveTimestamps(conf);
-        String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
-        if (archives != null){
-          for (int i = 0; i < archives.length; i++){
-            DistributedCache.releaseCache(archives[i], conf,
-              Long.parseLong(archivesTimestamps[i]));
-          }
-        }
-        if (files != null){
-          for(int i = 0; i < files.length; i++){
-            DistributedCache.releaseCache(files[i], conf,
-              Long.parseLong(fileTimestamps[i]));
-          }
+        for (CacheFile cf : localizedCacheFiles){
+          DistributedCache.releaseCache(cf.uri, conf, cf.timeStamp);
         }
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");



Mime
View raw message