Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 68AF317E9C for ; Wed, 6 May 2015 21:20:29 +0000 (UTC) Received: (qmail 99352 invoked by uid 500); 6 May 2015 21:20:29 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 99286 invoked by uid 500); 6 May 2015 21:20:29 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 99277 invoked by uid 99); 6 May 2015 21:20:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 May 2015 21:20:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 151A1DFF69; Wed, 6 May 2015 21:20:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rkanter@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter) Date: Wed, 6 May 2015 21:20:29 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 f8488b5ae -> 28b40aca9 YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter) (cherry picked from commit b72507810aece08e17ab4b5aae1f7eae1fe98609) Conflicts: hadoop-yarn-project/CHANGES.txt Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/28b40aca Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/28b40aca Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/28b40aca Branch: refs/heads/branch-2 Commit: 28b40aca98225383677b4bf4d3051a5c8f9be338 Parents: f8488b5 Author: Robert Kanter Authored: Wed May 6 14:19:06 2015 -0700 Committer: Robert Kanter Committed: Wed May 6 14:20:08 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 2 + .../server/nodemanager/DirectoryCollection.java | 33 ++++++++++++- .../nodemanager/LocalDirsHandlerService.java | 17 +++++++ .../localizer/ResourceLocalizationService.java | 51 +++++++++----------- .../nodemanager/TestDirectoryCollection.java | 47 ++++++++++++++++++ .../TestResourceLocalizationService.java | 28 ++++++++--- 6 files changed, 142 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/28b40aca/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 94301e7..a6749a3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -135,6 +135,8 @@ Release 2.8.0 - UNRELEASED YARN-2980. Move health check script related functionality to hadoop-common (Varun Saxena via aw) + YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not http://git-wip-us.apache.org/repos/asf/hadoop/blob/28b40aca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java index 2658918..32046c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java @@ -42,9 +42,12 @@ import org.apache.hadoop.util.DiskChecker; /** * Manages a list of local storage directories. */ -class DirectoryCollection { +public class DirectoryCollection { private static final Log LOG = LogFactory.getLog(DirectoryCollection.class); + /** + * The enum defines disk failure type. + */ public enum DiskErrorCause { DISK_FULL, OTHER } @@ -60,6 +63,13 @@ class DirectoryCollection { } /** + * The interface provides a callback when localDirs is changed. + */ + public interface DirsChangeListener { + void onDirsChanged(); + } + + /** * Returns a merged list which contains all the elements of l1 and l2 * @param l1 the first list to be included * @param l2 the second list to be included @@ -84,6 +94,8 @@ class DirectoryCollection { private int goodDirsDiskUtilizationPercentage; + private Set dirsChangeListeners; + /** * Create collection for the directories specified. No check for free space. * @@ -154,6 +166,20 @@ class DirectoryCollection { : utilizationPercentageCutOff); diskUtilizationSpaceCutoff = utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff; + + dirsChangeListeners = new HashSet(); + } + + synchronized void registerDirsChangeListener( + DirsChangeListener listener) { + if (dirsChangeListeners.add(listener)) { + listener.onDirsChanged(); + } + } + + synchronized void deregisterDirsChangeListener( + DirsChangeListener listener) { + dirsChangeListeners.remove(listener); } /** @@ -280,6 +306,11 @@ class DirectoryCollection { } } setGoodDirsDiskUtilizationPercentage(); + if (setChanged) { + for (DirsChangeListener listener : dirsChangeListeners) { + listener.onDirsChanged(); + } + } return setChanged; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/28b40aca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index 493571d..57d4395 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -38,6 +38,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; /** @@ -192,6 +193,22 @@ public class LocalDirsHandlerService extends AbstractService { super.serviceStop(); } + public void registerLocalDirsChangeListener(DirsChangeListener listener) { + localDirs.registerDirsChangeListener(listener); + } + + public void registerLogDirsChangeListener(DirsChangeListener listener) { + logDirs.registerDirsChangeListener(listener); + } + + public void deregisterLocalDirsChangeListener(DirsChangeListener listener) { + localDirs.deregisterDirsChangeListener(listener); + } + + public void deregisterLogDirsChangeListener(DirsChangeListener listener) { + logDirs.deregisterDirsChangeListener(listener); + } + /** * @return the good/valid local directories based on disks' health */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/28b40aca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 17ea1a9..603e795 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; @@ -161,6 +162,8 @@ public class ResourceLocalizationService extends CompositeService private LocalResourcesTracker publicRsrc; private LocalDirsHandlerService dirsHandler; + private DirsChangeListener localDirsChangeListener; + private DirsChangeListener logDirsChangeListener; private Context nmContext; /** @@ -254,6 +257,18 @@ public class ResourceLocalizationService extends CompositeService localizerTracker = createLocalizerTracker(conf); addService(localizerTracker); dispatcher.register(LocalizerEventType.class, localizerTracker); + localDirsChangeListener = new DirsChangeListener() { + @Override + public void onDirsChanged() { + checkAndInitializeLocalDirs(); + } + }; + logDirsChangeListener = new DirsChangeListener() { + @Override + public void onDirsChanged() { + initializeLogDirs(lfs); + } + }; super.serviceInit(conf); } @@ -345,6 +360,8 @@ public class ResourceLocalizationService extends CompositeService server.getListenerAddress()); LOG.info("Localizer started on port " + server.getPort()); super.serviceStart(); + dirsHandler.registerLocalDirsChangeListener(localDirsChangeListener); + dirsHandler.registerLogDirsChangeListener(logDirsChangeListener); } LocalizerTracker createLocalizerTracker(Configuration conf) { @@ -375,6 +392,8 @@ public class ResourceLocalizationService extends CompositeService @Override public void serviceStop() throws Exception { + dirsHandler.deregisterLocalDirsChangeListener(localDirsChangeListener); + dirsHandler.deregisterLogDirsChangeListener(logDirsChangeListener); if (server != null) { server.stop(); } @@ -814,11 +833,6 @@ public class ResourceLocalizationService extends CompositeService DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } - // In case this is not a newly initialized nm state, ensure - // initialized local/log dirs similar to LocalizerRunner - getInitializedLocalDirs(); - getInitializedLogDirs(); - // explicitly synchronize pending here to avoid future task // completing and being dequeued before pending updated synchronized (pending) { @@ -1120,8 +1134,6 @@ public class ResourceLocalizationService extends CompositeService // 1) write credentials to private dir writeCredentials(nmPrivateCTokensPath); // 2) exec initApplication and wait - List localDirs = getInitializedLocalDirs(); - List logDirs = getInitializedLogDirs(); if (dirsHandler.areDisksHealthy()) { exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, context.getUser(), @@ -1387,13 +1399,12 @@ public class ResourceLocalizationService extends CompositeService } /** - * Synchronized method to get a list of initialized local dirs. Method will - * check each local dir to ensure it has been setup correctly and will attempt - * to fix any issues it finds. - * - * @return list of initialized local dirs + * Check each local dir to ensure it has been setup correctly and will + * attempt to fix any issues it finds. + * @return void */ - synchronized private List getInitializedLocalDirs() { + @VisibleForTesting + void checkAndInitializeLocalDirs() { List dirs = dirsHandler.getLocalDirs(); List checkFailedDirs = new ArrayList(); for (String dir : dirs) { @@ -1415,7 +1426,6 @@ public class ResourceLocalizationService extends CompositeService throw new YarnRuntimeException(msg, e); } } - return dirs; } private boolean checkLocalDir(String localDir) { @@ -1463,17 +1473,4 @@ public class ResourceLocalizationService extends CompositeService localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission); return localDirPathFsPermissionsMap; } - - /** - * Synchronized method to get a list of initialized log dirs. Method will - * check each local dir to ensure it has been setup correctly and will attempt - * to fix any issues it finds. - * - * @return list of initialized log dirs - */ - synchronized private List getInitializedLogDirs() { - List dirs = dirsHandler.getLogDirs(); - initializeLogDirs(lfs); - return dirs; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/28b40aca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java index e4525a5..2fd89c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -258,4 +259,50 @@ public class TestDirectoryCollection { Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta); Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff()); } + + @Test + public void testDirsChangeListener() { + DirsChangeListenerTest listener1 = new DirsChangeListenerTest(); + DirsChangeListenerTest listener2 = new DirsChangeListenerTest(); + DirsChangeListenerTest listener3 = new DirsChangeListenerTest(); + + String dirA = new File(testDir, "dirA").getPath(); + String[] dirs = { dirA }; + DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F); + Assert.assertEquals(1, dc.getGoodDirs().size()); + Assert.assertEquals(listener1.num, 0); + Assert.assertEquals(listener2.num, 0); + Assert.assertEquals(listener3.num, 0); + dc.registerDirsChangeListener(listener1); + dc.registerDirsChangeListener(listener2); + dc.registerDirsChangeListener(listener3); + Assert.assertEquals(listener1.num, 1); + Assert.assertEquals(listener2.num, 1); + Assert.assertEquals(listener3.num, 1); + + dc.deregisterDirsChangeListener(listener3); + dc.checkDirs(); + Assert.assertEquals(0, dc.getGoodDirs().size()); + Assert.assertEquals(listener1.num, 2); + Assert.assertEquals(listener2.num, 2); + Assert.assertEquals(listener3.num, 1); + + dc.deregisterDirsChangeListener(listener2); + dc.setDiskUtilizationPercentageCutoff(100.0F); + dc.checkDirs(); + Assert.assertEquals(1, dc.getGoodDirs().size()); + Assert.assertEquals(listener1.num, 3); + Assert.assertEquals(listener2.num, 2); + Assert.assertEquals(listener3.num, 1); + } + + static class DirsChangeListenerTest implements DirsChangeListener { + public int num = 0; + public DirsChangeListenerTest() { + } + @Override + public void onDirsChanged() { + num++; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/28b40aca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 2edaf45..07001ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -1098,7 +1098,6 @@ public class TestResourceLocalizationService { isA(Configuration.class)); spyService.init(conf); - spyService.start(); final FsPermission defaultPerm = new FsPermission((short)0755); @@ -1110,6 +1109,8 @@ public class TestResourceLocalizationService { .mkdir(eq(publicCache),eq(defaultPerm), eq(true)); } + spyService.start(); + final String user = "user0"; // init application final Application app = mock(Application.class); @@ -1131,21 +1132,32 @@ public class TestResourceLocalizationService { r.setSeed(seed); // Queue up public resource localization - final LocalResource pubResource = getPublicMockedResource(r); - final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); + final LocalResource pubResource1 = getPublicMockedResource(r); + final LocalResourceRequest pubReq1 = + new LocalResourceRequest(pubResource1); + + LocalResource pubResource2 = null; + do { + pubResource2 = getPublicMockedResource(r); + } while (pubResource2 == null || pubResource2.equals(pubResource1)); + // above call to make sure we don't get identical resources. + final LocalResourceRequest pubReq2 = + new LocalResourceRequest(pubResource2); + + Set pubRsrcs = new HashSet(); + pubRsrcs.add(pubReq1); + pubRsrcs.add(pubReq2); Map> req = new HashMap>(); - req.put(LocalResourceVisibility.PUBLIC, - Collections.singletonList(pubReq)); - - Set pubRsrcs = new HashSet(); - pubRsrcs.add(pubReq); + req.put(LocalResourceVisibility.PUBLIC, pubRsrcs); spyService.handle(new ContainerLocalizationRequestEvent(c, req)); dispatcher.await(); + verify(spyService, times(1)).checkAndInitializeLocalDirs(); + // verify directory creation for (Path p : localDirs) { p = new Path((new URI(p.toString())).getPath());