hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r1577391 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/ hadoop-yarn/hadoop-yarn-server/ha...
Date Fri, 14 Mar 2014 00:30:35 GMT
Author: cdouglas
Date: Fri Mar 14 00:30:35 2014
New Revision: 1577391

URL: http://svn.apache.org/r1577391
Log:
YARN-1771. Reduce the number of NameNode operations during localization of
public resources using a cache. Contributed by Sangjin Lee


Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1577391&r1=1577390&r2=1577391&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Mar 14 00:30:35 2014
@@ -288,6 +288,9 @@ Release 2.4.0 - UNRELEASED
     expose analogous getApplication(s)/Attempt(s)/Container(s) APIs. (Mayank
     Bansal via zjshen)
 
+    YARN-1771. Reduce the number of NameNode operations during localization of
+    public resources using a cache. (Sangjin Lee via cdouglas)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java?rev=1577391&r1=1577390&r2=1577391&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
Fri Mar 14 00:30:35 2014
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -43,6 +45,11 @@ import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.Futures;
+
 /**
  * Download a single URL to the local disk.
  *
@@ -56,6 +63,7 @@ public class FSDownload implements Calla
   private final UserGroupInformation userUgi;
   private Configuration conf;
   private LocalResource resource;
+  private final LoadingCache<Path,Future<FileStatus>> statCache;
   
   /** The local FS dir path under which this resource is to be localized to */
   private Path destDirPath;
@@ -71,11 +79,18 @@ public class FSDownload implements Calla
 
   public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
       Path destDirPath, LocalResource resource) {
+    this(files, ugi, conf, destDirPath, resource, null);
+  }
+
+  public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
+      Path destDirPath, LocalResource resource,
+      LoadingCache<Path,Future<FileStatus>> statCache) {
     this.conf = conf;
     this.destDirPath = destDirPath;
     this.files = files;
     this.userUgi = ugi;
     this.resource = resource;
+    this.statCache = statCache;
   }
 
   LocalResource getResource() {
@@ -90,28 +105,43 @@ public class FSDownload implements Calla
   }
 
   /**
-   * Returns a boolean to denote whether a cache file is visible to all(public)
+   * Creates the cache loader for the status loading cache. This should be used
+   * to create an instance of the status cache that is passed into the
+   * FSDownload constructor.
+   */
+  public static CacheLoader<Path,Future<FileStatus>>
+      createStatusCacheLoader(final Configuration conf) {
+    return new CacheLoader<Path,Future<FileStatus>>() {
+      public Future<FileStatus> load(Path path) {
+        try {
+          FileSystem fs = path.getFileSystem(conf);
+          return Futures.immediateFuture(fs.getFileStatus(path));
+        } catch (Throwable th) {
+          // report failures so it can be memoized
+          return Futures.immediateFailedFuture(th);
+        }
+      }
+    };
+  }
+
+  /**
+   * Returns a boolean to denote whether a cache file is visible to all (public)
    * or not
-   * @param conf
-   * @param uri
-   * @return true if the path in the uri is visible to all, false otherwise
-   * @throws IOException
+   *
+   * @return true if the path in the current path is visible to all, false
+   * otherwise
    */
-  private static boolean isPublic(FileSystem fs, Path current) throws IOException {
+  @VisibleForTesting
+  static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
+      LoadingCache<Path,Future<FileStatus>> statCache) throws IOException {
     current = fs.makeQualified(current);
     //the leaf level file should be readable by others
-    if (!checkPublicPermsForAll(fs, current, FsAction.READ_EXECUTE, FsAction.READ)) {
+    if (!checkPublicPermsForAll(fs, sStat, FsAction.READ_EXECUTE, FsAction.READ)) {
       return false;
     }
-    return ancestorsHaveExecutePermissions(fs, current.getParent());
+    return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
   }
 
-  private static boolean checkPublicPermsForAll(FileSystem fs, Path current, 
-      FsAction dir, FsAction file) 
-    throws IOException {
-    return checkPublicPermsForAll(fs, fs.getFileStatus(current), dir, file);
-  }
-    
   private static boolean checkPublicPermsForAll(FileSystem fs, 
         FileStatus status, FsAction dir, FsAction file) 
     throws IOException {
@@ -137,12 +167,13 @@ public class FSDownload implements Calla
    * permission set for all users (i.e. that other users can traverse
    * the directory heirarchy to the given path)
    */
-  private static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path)
-    throws IOException {
+  private static boolean ancestorsHaveExecutePermissions(FileSystem fs,
+      Path path, LoadingCache<Path,Future<FileStatus>> statCache)
+      throws IOException {
     Path current = path;
     while (current != null) {
       //the subdirs in the path should have execute permissions for others
-      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
         return false;
       }
       current = current.getParent();
@@ -160,14 +191,46 @@ public class FSDownload implements Calla
    * @throws IOException
    */
   private static boolean checkPermissionOfOther(FileSystem fs, Path path,
-      FsAction action) throws IOException {
-    FileStatus status = fs.getFileStatus(path);
+      FsAction action, LoadingCache<Path,Future<FileStatus>> statCache)
+      throws IOException {
+    FileStatus status = getFileStatus(fs, path, statCache);
     FsPermission perms = status.getPermission();
     FsAction otherAction = perms.getOtherAction();
     return otherAction.implies(action);
   }
 
-  
+  /**
+   * Obtains the file status, first by checking the stat cache if it is
+   * available, and then by getting it explicitly from the filesystem. If we got
+   * the file status from the filesystem, it is added to the stat cache.
+   *
+   * The stat cache is expected to be managed by callers who provided it to
+   * FSDownload.
+   */
+  private static FileStatus getFileStatus(final FileSystem fs, final Path path,
+      LoadingCache<Path,Future<FileStatus>> statCache) throws IOException {
+    // if the stat cache does not exist, simply query the filesystem
+    if (statCache == null) {
+      return fs.getFileStatus(path);
+    }
+
+    try {
+      // get or load it from the cache
+      return statCache.get(path).get();
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      // the underlying exception should normally be IOException
+      if (cause instanceof IOException) {
+        throw (IOException)cause;
+      } else {
+        throw new IOException(cause);
+      }
+    } catch (InterruptedException e) { // should not happen
+      Thread.currentThread().interrupt();
+      throw new IOException(e);
+    }
+  }
+
   private Path copy(Path sCopy, Path dstdir) throws IOException {
     FileSystem sourceFs = sCopy.getFileSystem(conf);
     Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName());
@@ -178,14 +241,15 @@ public class FSDownload implements Calla
           ", was " + sStat.getModificationTime());
     }
     if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
-      if (!isPublic(sourceFs, sCopy)) {
+      if (!isPublic(sourceFs, sCopy, sStat, statCache)) {
         throw new IOException("Resource " + sCopy +
             " is not publicly accessable and as such cannot be part of the" +
             " public cache.");
       }
     }
-    
-    sourceFs.copyToLocalFile(sCopy, dCopy);
+
+    FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false,
+        true, conf);
     return dCopy;
   }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java?rev=1577391&r1=1577390&r2=1577391&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
Fri Mar 14 00:30:35 2014
@@ -21,27 +21,35 @@ package org.apache.hadoop.yarn.util;
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
+import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
-import java.util.zip.GZIPOutputStream;
 
 import junit.framework.Assert;
 
@@ -64,10 +72,13 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 public class TestFSDownload {
 
   private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
@@ -88,6 +99,18 @@ public class TestFSDownload {
 
   static LocalResource createFile(FileContext files, Path p, int len,
       Random r, LocalResourceVisibility vis) throws IOException {
+    createFile(files, p, len, r);
+    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(p));
+    ret.setSize(len);
+    ret.setType(LocalResourceType.FILE);
+    ret.setVisibility(vis);
+    ret.setTimestamp(files.getFileStatus(p).getModificationTime());
+    return ret;
+  }
+
+  static void createFile(FileContext files, Path p, int len, Random r)
+      throws IOException {
     FSDataOutputStream out = null;
     try {
       byte[] bytes = new byte[len];
@@ -97,13 +120,6 @@ public class TestFSDownload {
     } finally {
       if (out != null) out.close();
     }
-    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
-    ret.setResource(ConverterUtils.getYarnUrlFromPath(p));
-    ret.setSize(len);
-    ret.setType(LocalResourceType.FILE);
-    ret.setVisibility(vis);
-    ret.setTimestamp(files.getFileStatus(p).getModificationTime());
-    return ret;
   }
 
   static LocalResource createJar(FileContext files, Path p,
@@ -285,6 +301,76 @@ public class TestFSDownload {
     }
   }
 
+  @Test (timeout=60000)
+  public void testDownloadPublicWithStatCache() throws IOException,
+      URISyntaxException, InterruptedException, ExecutionException {
+    final Configuration conf = new Configuration();
+    FileContext files = FileContext.getLocalFSFileContext(conf);
+    Path basedir = files.makeQualified(new Path("target",
+      TestFSDownload.class.getSimpleName()));
+    files.mkdir(basedir, null, true);
+    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+    int size = 512;
+
+    final ConcurrentMap<Path,AtomicInteger> counts =
+        new ConcurrentHashMap<Path,AtomicInteger>();
+    final CacheLoader<Path,Future<FileStatus>> loader =
+        FSDownload.createStatusCacheLoader(conf);
+    final LoadingCache<Path,Future<FileStatus>> statCache =
+        CacheBuilder.newBuilder().build(new CacheLoader<Path,Future<FileStatus>>()
{
+      public Future<FileStatus> load(Path path) throws Exception {
+        // increment the count
+        AtomicInteger count = counts.get(path);
+        if (count == null) {
+          count = new AtomicInteger(0);
+          AtomicInteger existing = counts.putIfAbsent(path, count);
+          if (existing != null) {
+            count = existing;
+          }
+        }
+        count.incrementAndGet();
+
+        // use the default loader
+        return loader.load(path);
+      }
+    });
+
+    // test FSDownload.isPublic() concurrently
+    final int fileCount = 3;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < fileCount; i++) {
+      Random rand = new Random();
+      long sharedSeed = rand.nextLong();
+      rand.setSeed(sharedSeed);
+      System.out.println("SEED: " + sharedSeed);
+      final Path path = new Path(basedir, "test-file-" + i);
+      createFile(files, path, size, rand);
+      final FileSystem fs = path.getFileSystem(conf);
+      final FileStatus sStat = fs.getFileStatus(path);
+      tasks.add(new Callable<Boolean>() {
+        public Boolean call() throws IOException {
+          return FSDownload.isPublic(fs, path, sStat, statCache);
+        }
+      });
+    }
+
+    ExecutorService exec = Executors.newFixedThreadPool(fileCount);
+    try {
+      List<Future<Boolean>> futures = exec.invokeAll(tasks);
+      // files should be public
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+      // for each path exactly one file status call should be made
+      for (AtomicInteger count: counts.values()) {
+        assertSame(count.get(), 1);
+      }
+    } finally {
+      exec.shutdown();
+    }
+  }
+
   @Test (timeout=10000)
   public void testDownload() throws IOException, URISyntaxException,
       InterruptedException {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java?rev=1577391&r1=1577390&r2=1577391&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java
Fri Mar 14 00:30:35 2014
@@ -18,20 +18,34 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 
+import com.google.common.cache.LoadingCache;
+
 public class LocalizerContext {
 
   private final String user;
   private final ContainerId containerId;
   private final Credentials credentials;
+  private final LoadingCache<Path,Future<FileStatus>> statCache;
 
   public LocalizerContext(String user, ContainerId containerId,
       Credentials credentials) {
+    this(user, containerId, credentials, null);
+  }
+
+  public LocalizerContext(String user, ContainerId containerId,
+      Credentials credentials,
+      LoadingCache<Path,Future<FileStatus>> statCache) {
     this.user = user;
     this.containerId = containerId;
     this.credentials = credentials;
+    this.statCache = statCache;
   }
 
   public String getUser() {
@@ -46,4 +60,7 @@ public class LocalizerContext {
     return credentials;
   }
 
+  public LoadingCache<Path,Future<FileStatus>> getStatCache() {
+    return statCache;
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1577391&r1=1577390&r2=1577391&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Fri Mar 14 00:30:35 2014
@@ -83,8 +83,8 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
@@ -119,6 +119,8 @@ import org.apache.hadoop.yarn.util.Conve
 import org.apache.hadoop.yarn.util.FSDownload;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.LoadingCache;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class ResourceLocalizationService extends CompositeService
@@ -362,8 +364,11 @@ public class ResourceLocalizationService
   private void handleInitContainerResources(
       ContainerLocalizationRequestEvent rsrcReqs) {
     Container c = rsrcReqs.getContainer();
+    // create a loading cache for the file statuses
+    LoadingCache<Path,Future<FileStatus>> statCache =
+        CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
     LocalizerContext ctxt = new LocalizerContext(
-        c.getUser(), c.getContainerId(), c.getCredentials());
+        c.getUser(), c.getContainerId(), c.getCredentials(), statCache);
     Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
       rsrcReqs.getRequestedResources();
     for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>>
e :
@@ -680,7 +685,8 @@ public class ResourceLocalizationService
             // completing and being dequeued before pending updated
             synchronized (pending) {
               pending.put(queue.submit(new FSDownload(lfs, null, conf,
-                  publicDirDestPath, resource)), request);
+                  publicDirDestPath, resource, request.getContext().getStatCache())),
+                  request);
             }
           } catch (IOException e) {
             rsrc.unlock();



Mime
View raw message