Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 41EF7FB44 for ; Wed, 3 Apr 2013 05:02:00 +0000 (UTC) Received: (qmail 72057 invoked by uid 500); 3 Apr 2013 05:02:00 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 71975 invoked by uid 500); 3 Apr 2013 05:01:59 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 71958 invoked by uid 99); 3 Apr 2013 05:01:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Apr 2013 05:01:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Apr 2013 05:01:56 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3FEF323888EA; Wed, 3 Apr 2013 05:01:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1463824 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-common/src/main/resources/ hadoop-yarn/hadoop-... Date: Wed, 03 Apr 2013 05:01:35 -0000 To: yarn-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130403050136.3FEF323888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: vinodkv Date: Wed Apr 3 05:01:35 2013 New Revision: 1463824 URL: http://svn.apache.org/r1463824 Log: YARN-467. Modify public distributed cache to localize files such that no local directory hits unix file count limits and thus prevent job failures. Contributed by Omkar Vinit Joshi. svn merge --ignore-ancestry -c 1463823 ../../trunk/ Added: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java - copied unchanged from r1463823, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java - copied unchanged from r1463823, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1463824&r1=1463823&r2=1463824&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Apr 3 05:01:35 2013 @@ -120,7 +120,11 @@ Release 2.0.5-beta - UNRELEASED YARN-382. SchedulerUtils improve way normalizeRequest sets the resource capabilities. (Zhijie Shen via bikas) - + + YARN-467. Modify public distributed cache to localize files such that no + local directory hits unix file count limits and thus prevent job failures. + (Omkar Vinit Joshi via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1463824&r1=1463823&r2=1463824&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml Wed Apr 3 05:01:35 2013 @@ -256,4 +256,18 @@ + + + + + + + + + + + + + + Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1463824&r1=1463823&r2=1463824&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Apr 3 05:01:35 2013 @@ -340,7 +340,15 @@ public class YarnConfiguration extends C /**List of directories to store localized files in.*/ public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs"; public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir"; - + + /** + * Number of files in each localized directories + * Avoid tuning this too low. + */ + public static final String NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY = + NM_PREFIX + "local-cache.max-files-per-directory"; + public static final int DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY = 8192; + /** Address where the localizer IPC is.*/ public static final String NM_LOCALIZER_ADDRESS = NM_PREFIX + "localizer.address"; Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1463824&r1=1463823&r2=1463824&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed Apr 3 05:01:35 2013 @@ -360,6 +360,25 @@ + It limits the maximum number of files which will be localized + in a single local directory. If the limit is reached then sub-directories + will be created and new files will be localized in them. If it is set to + a value less than or equal to 36 [which are sub-directories (0-9 and then + a-z)] then NodeManager will fail to start. For example; [for public + cache] if this is configured with a value of 40 ( 4 files + + 36 sub-directories) and the local-dir is "/tmp/local-dir1" then it will + allow 4 files to be created directly inside "/tmp/local-dir1/filecache". + For files that are localized further it will create a sub-directory "0" + inside "/tmp/local-dir1/filecache" and will localize files inside it + until it becomes full. If a file is removed from a sub-directory that + is marked full, then that sub-directory will be used back again to + localize files. + + yarn.nodemanager.local-cache.max-files-per-directory + 8192 + + + Address where the localizer IPC is. yarn.nodemanager.localizer.address 0.0.0.0:8040 Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1463824&r1=1463823&r2=1463824&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Wed Apr 3 05:01:35 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -35,6 +36,11 @@ interface LocalResourcesTracker boolean remove(LocalizedResource req, DeletionService delService); + Path getPathForLocalization(LocalResourceRequest req, Path localDirPath); + String getUser(); + // TODO: Remove this in favour of EventHandler.handle + void localizationCompleted(LocalResourceRequest req, boolean success); + } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1463824&r1=1463823&r2=1463824&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Wed Apr 3 05:01:35 2013 @@ -26,12 +26,13 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; + /** * A collection of {@link LocalizedResource}s all of same @@ -49,17 +50,43 @@ class LocalResourcesTrackerImpl implemen private final String user; private final Dispatcher dispatcher; private final ConcurrentMap localrsrc; + private Configuration conf; + /* + * This flag controls whether this resource tracker uses hierarchical + * directories or not. For PRIVATE and PUBLIC resource trackers it + * will be set whereas for APPLICATION resource tracker it would + * be false. + */ + private final boolean useLocalCacheDirectoryManager; + private ConcurrentHashMap directoryManagers; + /* + * It is used to keep track of resource into hierarchical directory + * while it is getting downloaded. It is useful for reference counting + * in case resource localization fails. + */ + private ConcurrentHashMap + inProgressLocalResourcesMap; - public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) { + public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, + boolean useLocalCacheDirectoryManager, Configuration conf) { this(user, dispatcher, - new ConcurrentHashMap()); + new ConcurrentHashMap(), + useLocalCacheDirectoryManager, conf); } LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, - ConcurrentMap localrsrc) { + ConcurrentMap localrsrc, + boolean useLocalCacheDirectoryManager, Configuration conf) { this.user = user; this.dispatcher = dispatcher; this.localrsrc = localrsrc; + this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager; + if ( this.useLocalCacheDirectoryManager) { + directoryManagers = new ConcurrentHashMap(); + inProgressLocalResourcesMap = + new ConcurrentHashMap(); + } + this.conf = conf; } @Override @@ -73,6 +100,7 @@ class LocalResourcesTrackerImpl implemen LOG.info("Resource " + rsrc.getLocalPath() + " is missing, localizing it again"); localrsrc.remove(req); + decrementFileCountForLocalCacheDirectory(req, rsrc); rsrc = null; } if (null == rsrc) { @@ -90,7 +118,52 @@ class LocalResourcesTrackerImpl implemen rsrc.handle(event); } - /** + /* + * Update the file-count statistics for a local cache-directory. + * This will retrieve the localized path for the resource from + * 1) inProgressRsrcMap if the resource was under localization and it + * failed. + * 2) LocalizedResource if the resource is already localized. + * From this path it will identify the local directory under which the + * resource was localized. Then rest of the path will be used to decrement + * file count for the HierarchicalSubDirectory pointing to this relative + * path. + */ + private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req, + LocalizedResource rsrc) { + if ( useLocalCacheDirectoryManager) { + Path rsrcPath = null; + if (inProgressLocalResourcesMap.containsKey(req)) { + // This happens when localization of a resource fails. + rsrcPath = inProgressLocalResourcesMap.remove(req); + } else if (rsrc != null && rsrc.getLocalPath() != null) { + rsrcPath = rsrc.getLocalPath().getParent().getParent(); + } + if (rsrcPath != null) { + Path parentPath = new Path(rsrcPath.toUri().getRawPath()); + while (!directoryManagers.containsKey(parentPath)) { + parentPath = parentPath.getParent(); + if ( parentPath == null) { + return; + } + } + if ( parentPath != null) { + String parentDir = parentPath.toUri().getRawPath().toString(); + LocalCacheDirectoryManager dir = directoryManagers.get(parentPath); + String rsrcDir = rsrcPath.toUri().getRawPath(); + if (rsrcDir.equals(parentDir)) { + dir.decrementFileCountForPath(""); + } else { + dir.decrementFileCountForPath( + rsrcDir.substring( + parentDir.length() + 1)); + } + } + } + } + } + +/** * This module checks if the resource which was localized is already present * or not * @@ -100,7 +173,8 @@ class LocalResourcesTrackerImpl implemen public boolean isResourcePresent(LocalizedResource rsrc) { boolean ret = true; if (rsrc.getState() == ResourceState.LOCALIZED) { - File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString()); + File file = new File(rsrc.getLocalPath().toUri().getRawPath(). + toString()); if (!file.exists()) { ret = false; } @@ -133,11 +207,11 @@ class LocalResourcesTrackerImpl implemen if (ResourceState.LOCALIZED.equals(rsrc.getState())) { delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath())); } + decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc); return true; } } - /** * Returns the path up to the random directory component. */ @@ -163,4 +237,50 @@ class LocalResourcesTrackerImpl implemen public Iterator iterator() { return localrsrc.values().iterator(); } -} + + /** + * @return {@link Path} absolute path for localization which includes local + * directory path and the relative hierarchical path (if use local + * cache directory manager is enabled) + * + * @param {@link LocalResourceRequest} Resource localization request to + * localize the resource. + * @param {@link Path} local directory path + */ + @Override + public Path + getPathForLocalization(LocalResourceRequest req, Path localDirPath) { + if (useLocalCacheDirectoryManager && localDirPath != null) { + + if (!directoryManagers.containsKey(localDirPath)) { + directoryManagers.putIfAbsent(localDirPath, + new LocalCacheDirectoryManager(conf)); + } + LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath); + + Path rPath = localDirPath; + String hierarchicalPath = dir.getRelativePathForLocalization(); + // For most of the scenarios we will get root path only which + // is an empty string + if (!hierarchicalPath.isEmpty()) { + rPath = new Path(localDirPath, hierarchicalPath); + } + inProgressLocalResourcesMap.put(req, rPath); + return rPath; + } else { + return localDirPath; + } + } + + @Override + public void localizationCompleted(LocalResourceRequest req, + boolean success) { + if (useLocalCacheDirectoryManager) { + if (!success) { + decrementFileCountForLocalCacheDirectory(req, null); + } else { + inProgressLocalResourcesMap.remove(req); + } + } + } +} \ No newline at end of file Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1463824&r1=1463823&r2=1463824&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Apr 3 05:01:35 2013 @@ -64,6 +64,7 @@ import org.apache.hadoop.security.Creden import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -130,7 +131,7 @@ public class ResourceLocalizationService private RecordFactory recordFactory; private final ScheduledExecutorService cacheCleanup; - private final LocalResourcesTracker publicRsrc; + private LocalResourcesTracker publicRsrc; private LocalDirsHandlerService dirsHandler; @@ -158,7 +159,6 @@ public class ResourceLocalizationService this.delService = delService; this.dirsHandler = dirsHandler; - this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher); this.cacheCleanup = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() .setNameFormat("ResourceLocalizationService Cache Cleanup") @@ -173,8 +173,26 @@ public class ResourceLocalizationService } } + private void validateConf(Configuration conf) { + int perDirFileLimit = + conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, + YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY); + if (perDirFileLimit <= 36) { + LOG.error(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY + + " parameter is configured with very low value."); + throw new YarnException( + YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY + + " parameter is configured with a value less than 37."); + } else { + LOG.info("per directory file limit = " + perDirFileLimit); + } + } + @Override public void init(Configuration conf) { + this.validateConf(conf); + this.publicRsrc = + new LocalResourcesTrackerImpl(null, dispatcher, true, conf); this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); try { @@ -212,6 +230,7 @@ public class ResourceLocalizationService YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT); + localizerTracker = createLocalizerTracker(conf); addService(localizerTracker); dispatcher.register(LocalizerEventType.class, localizerTracker); @@ -306,15 +325,17 @@ public class ResourceLocalizationService private void handleInitApplicationResources(Application app) { // 0) Create application tracking structs String userName = app.getUser(); - privateRsrc.putIfAbsent(userName, - new LocalResourcesTrackerImpl(userName, dispatcher)); - if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()), - new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) { + privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName, + dispatcher, false, super.getConfig())); + if (null != appRsrc.putIfAbsent( + ConverterUtils.toString(app.getAppId()), + new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super + .getConfig()))) { LOG.warn("Initializing application " + app + " already present"); assert false; // TODO: FIXME assert doesn't help // ^ The condition is benign. Tests should fail and it - // should appear in logs, but it's an internal error - // that should have no effect on applications + // should appear in logs, but it's an internal error + // that should have no effect on applications } // 1) Signal container init // @@ -620,6 +641,13 @@ public class ResourceLocalizationService Path publicDirDestPath = dirsHandler.getLocalPathForWrite( "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, ContainerLocalizer.getEstimatedSize(resource), true); + Path hierarchicalPath = + publicRsrc.getPathForLocalization(key, publicDirDestPath); + if (!hierarchicalPath.equals(publicDirDestPath)) { + publicDirDestPath = hierarchicalPath; + DiskChecker.checkDir( + new File(publicDirDestPath.toUri().getPath())); + } pending.put(queue.submit(new FSDownload( lfs, null, conf, publicDirDestPath, resource, new Random())), request); @@ -654,19 +682,21 @@ public class ResourceLocalizationService assoc.getResource().handle( new ResourceLocalizedEvent(key, local, FileUtil.getDU(new File(local.toUri())))); + publicRsrc.localizationCompleted(key, true); synchronized (attempts) { attempts.remove(key); } } catch (ExecutionException e) { LOG.info("Failed to download rsrc " + assoc.getResource(), e.getCause()); + LocalResourceRequest req = assoc.getResource().getRequest(); dispatcher.getEventHandler().handle( new ContainerResourceFailedEvent( assoc.getContext().getContainerId(), - assoc.getResource().getRequest(), e.getCause())); + req, e.getCause())); + publicRsrc.localizationCompleted(req, false); List reqs; synchronized (attempts) { - LocalResourceRequest req = assoc.getResource().getRequest(); reqs = attempts.get(req); if (null == reqs) { LOG.error("Missing pending list for " + req); @@ -1003,4 +1033,4 @@ public class ResourceLocalizationService del.delete(null, dirPath, new Path[] {}); } -} +} \ No newline at end of file Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1463824&r1=1463823&r2=1463824&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Wed Apr 3 05:01:35 2013 @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -50,17 +51,17 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; -import org.mortbay.log.Log; public class TestLocalResourcesTrackerImpl { - @Test + @Test(timeout=10000) @SuppressWarnings("unchecked") public void test() { String user = "testuser"; DrainDispatcher dispatcher = null; try { - dispatcher = createDispatcher(new Configuration()); + Configuration conf = new Configuration(); + dispatcher = createDispatcher(conf); EventHandler localizerEventHandler = mock(EventHandler.class); EventHandler containerEventHandler = @@ -86,7 +87,8 @@ public class TestLocalResourcesTrackerIm localrsrc.put(req1, lr1); localrsrc.put(req2, lr2); LocalResourcesTracker tracker = - new LocalResourcesTrackerImpl(user, dispatcher, localrsrc); + new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false, + conf); ResourceEvent req11Event = new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); @@ -152,13 +154,14 @@ public class TestLocalResourcesTrackerIm } } - @Test + @Test(timeout=10000) @SuppressWarnings("unchecked") public void testConsistency() { String user = "testuser"; DrainDispatcher dispatcher = null; try { - dispatcher = createDispatcher(new Configuration()); + Configuration conf = new Configuration(); + dispatcher = createDispatcher(conf); EventHandler localizerEventHandler = mock(EventHandler.class); EventHandler containerEventHandler = mock(EventHandler.class); dispatcher.register(LocalizerEventType.class, localizerEventHandler); @@ -172,7 +175,7 @@ public class TestLocalResourcesTrackerIm ConcurrentMap localrsrc = new ConcurrentHashMap(); localrsrc.put(req1, lr1); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - dispatcher, localrsrc); + dispatcher, localrsrc, false, conf); ResourceEvent req11Event = new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); @@ -221,6 +224,113 @@ public class TestLocalResourcesTrackerIm } } + @Test(timeout = 100000) + @SuppressWarnings("unchecked") + public void testHierarchicalLocalCacheDirectories() { + String user = "testuser"; + DrainDispatcher dispatcher = null; + try { + Configuration conf = new Configuration(); + // setting per directory file limit to 1. + conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37"); + dispatcher = createDispatcher(conf); + + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + + DeletionService mockDelService = mock(DeletionService.class); + + ConcurrentMap localrsrc = + new ConcurrentHashMap(); + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + dispatcher, localrsrc, true, conf); + + // This is a random path. NO File creation will take place at this place. + Path localDir = new Path("/tmp"); + + // Container 1 needs lr1 resource + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1, + LocalResourceVisibility.PUBLIC); + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + + // Container 1 requests lr1 to be localized + ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1, + LocalResourceVisibility.PUBLIC, lc1); + tracker.handle(reqEvent1); + + // Simulate the process of localization of lr1 + Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir); + // Simulate lr1 getting localized + ResourceLocalizedEvent rle = + new ResourceLocalizedEvent(lr1, + new Path(hierarchicalPath1.toUri().toString() + + Path.SEPARATOR + "file1"), 120); + tracker.handle(rle); + // Localization successful. + tracker.localizationCompleted(lr1, true); + + LocalResourceRequest lr2 = createLocalResourceRequest(user, 3, 3, + LocalResourceVisibility.PUBLIC); + Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir); + // localization failed. + tracker.localizationCompleted(lr2, false); + + /* + * The path returned for two localization should be different because we + * are limiting one file per sub-directory. + */ + Assert.assertNotSame(hierarchicalPath1, hierarchicalPath2); + + LocalResourceRequest lr3 = createLocalResourceRequest(user, 2, 2, + LocalResourceVisibility.PUBLIC); + ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3, + LocalResourceVisibility.PUBLIC, lc1); + tracker.handle(reqEvent3); + Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir); + tracker.localizationCompleted(lr3, true); + + // Verifying that path created is inside the subdirectory + Assert.assertEquals(hierarchicalPath3.toUri().toString(), + hierarchicalPath1.toUri().toString() + Path.SEPARATOR + "0"); + + // Container 1 releases resource lr1 + ResourceEvent relEvent1 = new ResourceReleaseEvent(lr1, cId1); + tracker.handle(relEvent1); + + // Validate the file counts now + int resources = 0; + Iterator iter = tracker.iterator(); + while (iter.hasNext()) { + iter.next(); + resources++; + } + // There should be only two resources lr1 and lr3 now. + Assert.assertEquals(2, resources); + + // Now simulate cache cleanup - removes unused resources. + iter = tracker.iterator(); + while (iter.hasNext()) { + LocalizedResource rsrc = iter.next(); + if (rsrc.getRefCount() == 0) { + Assert.assertTrue(tracker.remove(rsrc, mockDelService)); + resources--; + } + } + // lr1 is not used by anyone and will be removed, only lr3 will hang + // around + Assert.assertEquals(1, resources); + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + private boolean createdummylocalizefile(Path path) { boolean ret = false; File file = new File(path.toUri().getRawPath().toString()); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1463824&r1=1463823&r2=1463824&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Wed Apr 3 05:01:35 2013 @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.no import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -76,10 +77,11 @@ public class TestResourceRetention { LocalResourcesTracker createMockTracker(String user, final long rsrcSize, long nRsrcs, long timestamp, long tsstep) { + Configuration conf = new Configuration(); ConcurrentMap trackerResources = new ConcurrentHashMap(); LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null, - trackerResources)); + trackerResources, false, conf)); for (int i = 0; i < nRsrcs; ++i) { final LocalResourceRequest req = new LocalResourceRequest( new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,