Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4F416200CAB for ; Sun, 4 Jun 2017 07:22:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4DDE6160BD1; Sun, 4 Jun 2017 05:22:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 200EC160BCD for ; Sun, 4 Jun 2017 07:22:48 +0200 (CEST) Received: (qmail 63413 invoked by uid 500); 4 Jun 2017 05:22:48 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 63401 invoked by uid 99); 4 Jun 2017 05:22:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jun 2017 05:22:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A27D4DFB92; Sun, 4 Jun 2017 05:22:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sunilg@apache.org To: common-commits@hadoop.apache.org Message-Id: <9eef53fe338146cdaf808b817009fcf2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-5333. Some recovered apps are put into default queue when RM HA. Contributed by Jun Gong. Date: Sun, 4 Jun 2017 05:22:47 +0000 (UTC) archived-at: Sun, 04 Jun 2017 05:22:50 -0000 Repository: hadoop Updated Branches: refs/heads/branch-2.8 d94638785 -> 13b4a6b2b YARN-5333. Some recovered apps are put into default queue when RM HA. Contributed by Jun Gong. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13b4a6b2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13b4a6b2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13b4a6b2 Branch: refs/heads/branch-2.8 Commit: 13b4a6b2b9aa626d473e1d5f9740ec53fc5603cd Parents: d946387 Author: Sunil G Authored: Sun Jun 4 10:46:47 2017 +0530 Committer: Sunil G Committed: Sun Jun 4 10:46:47 2017 +0530 ---------------------------------------------------------------------- .../server/resourcemanager/AdminService.java | 118 ++++++++++++------- .../scheduler/fair/TestFairScheduler.java | 72 ++++++++++- 2 files changed, 142 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/13b4a6b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 74be9cd..d8dbd53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -49,7 +49,6 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.DecommissionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.HAUtil; @@ -301,15 +300,7 @@ public class AdminService extends CompositeService implements UserGroupInformation user = checkAccess("transitionToActive"); checkHaStateChange(reqInfo); - try { - rm.transitionToActive(); - } catch (Exception e) { - RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", - "", "RM", - "Exception transitioning to active"); - throw new ServiceFailedException( - "Error when transitioning to Active mode", e); - } + try { // call all refresh*s for active RM to get the updated configurations. refreshAll(); @@ -319,10 +310,22 @@ public class AdminService extends CompositeService implements .getDispatcher() .getEventHandler() .handle( - new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e)); + new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, + e)); throw new ServiceFailedException( - "Error on refreshAll during transistion to Active", e); + "Error on refreshAll during transition to Active", e); } + + try { + rm.transitionToActive(); + } catch (Exception e) { + RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", + "", "RM", + "Exception transitioning to active"); + throw new ServiceFailedException( + "Error when transitioning to Active mode", e); + } + RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", "RM"); } @@ -384,12 +387,7 @@ public class AdminService extends CompositeService implements RefreshQueuesResponse response = recordFactory.newRecordInstance(RefreshQueuesResponse.class); try { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); - // refresh the reservation system - ReservationSystem rSystem = rmContext.getReservationSystem(); - if (rSystem != null) { - rSystem.reinitialize(getConfig(), rmContext); - } + refreshQueues(); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); return response; @@ -398,6 +396,15 @@ public class AdminService extends CompositeService implements } } + private void refreshQueues() throws IOException, YarnException { + rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); + // refresh the reservation system + ReservationSystem rSystem = rmContext.getReservationSystem(); + if (rSystem != null) { + rSystem.reinitialize(getConfig(), rmContext); + } + } + @Override public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws YarnException, StandbyException { @@ -430,6 +437,13 @@ public class AdminService extends CompositeService implements } } + private void refreshNodes() throws IOException, YarnException { + Configuration conf = + getConfiguration(new Configuration(false), + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + rmContext.getNodesListManager().refreshNodes(conf); + } + @Override public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( RefreshSuperUserGroupsConfigurationRequest request) @@ -440,6 +454,16 @@ public class AdminService extends CompositeService implements checkRMStatus(user.getShortUserName(), operation, "refresh super-user-groups."); + refreshSuperUserGroupsConfiguration(); + RMAuditLogger.logSuccess(user.getShortUserName(), + operation, "AdminService"); + + return recordFactory.newRecordInstance( + RefreshSuperUserGroupsConfigurationResponse.class); + } + + private void refreshSuperUserGroupsConfiguration() + throws IOException, YarnException { // Accept hadoop common configs in core-site.xml as well as RM specific // configurations in yarn-site.xml Configuration conf = @@ -448,11 +472,6 @@ public class AdminService extends CompositeService implements YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); RMServerUtils.processRMProxyUsersConf(conf); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); - RMAuditLogger.logSuccess(user.getShortUserName(), - operation, "AdminService"); - - return recordFactory.newRecordInstance( - RefreshSuperUserGroupsConfigurationResponse.class); } @Override @@ -464,10 +483,7 @@ public class AdminService extends CompositeService implements checkRMStatus(user.getShortUserName(), operation, "refresh user-groups."); - Groups.getUserToGroupsMappingService( - getConfiguration(new Configuration(false), - YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh(); - + refreshUserToGroupsMappings(); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -475,6 +491,12 @@ public class AdminService extends CompositeService implements RefreshUserToGroupsMappingsResponse.class); } + private void refreshUserToGroupsMappings() throws IOException, YarnException { + Groups.getUserToGroupsMappingService( + getConfiguration(new Configuration(false), + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh(); + } + @Override public RefreshAdminAclsResponse refreshAdminAcls( RefreshAdminAclsRequest request) throws YarnException, IOException { @@ -517,6 +539,14 @@ public class AdminService extends CompositeService implements checkRMStatus(user.getShortUserName(), operation, "refresh Service ACLs."); + refreshServiceAcls(); + RMAuditLogger.logSuccess(user.getShortUserName(), operation, + "AdminService"); + + return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); + } + + private void refreshServiceAcls() throws IOException, YarnException { PolicyProvider policyProvider = RMPolicyProvider.getInstance(); Configuration conf = getConfiguration(new Configuration(false), @@ -528,11 +558,6 @@ public class AdminService extends CompositeService implements conf, policyProvider); rmContext.getResourceTrackerService().refreshServiceAcls( conf, policyProvider); - - RMAuditLogger.logSuccess(user.getShortUserName(), operation, - "AdminService"); - - return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); } private synchronized void refreshServiceAcls(Configuration configuration, @@ -663,18 +688,17 @@ public class AdminService extends CompositeService implements private void refreshAll() throws ServiceFailedException { try { - refreshQueues(RefreshQueuesRequest.newInstance()); - refreshNodes(RefreshNodesRequest.newInstance(DecommissionType.NORMAL)); - refreshSuperUserGroupsConfiguration( - RefreshSuperUserGroupsConfigurationRequest.newInstance()); - refreshUserToGroupsMappings( - RefreshUserToGroupsMappingsRequest.newInstance()); + checkAcls("refreshAll"); + refreshQueues(); + refreshNodes(); + refreshSuperUserGroupsConfiguration(); + refreshUserToGroupsMappings(); if (getConfig().getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - refreshServiceAcls(RefreshServiceAclsRequest.newInstance()); + refreshServiceAcls(); } - refreshClusterMaxPriority(RefreshClusterMaxPriorityRequest.newInstance()); + refreshClusterMaxPriority(); } catch (Exception ex) { throw new ServiceFailedException("RefreshAll operation failed", ex); } @@ -855,11 +879,7 @@ public class AdminService extends CompositeService implements checkRMStatus(user.getShortUserName(), operation, msg); try { - Configuration conf = - getConfiguration(new Configuration(false), - YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - - rmContext.getScheduler().setClusterMaxPriority(conf); + refreshClusterMaxPriority(); RMAuditLogger .logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -869,4 +889,12 @@ public class AdminService extends CompositeService implements throw logAndWrapException(e, user.getShortUserName(), operation, msg); } } + + private void refreshClusterMaxPriority() throws IOException, YarnException { + Configuration conf = + getConfiguration(new Configuration(false), + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + + rmContext.getScheduler().setClusterMaxPriority(conf); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13b4a6b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 62b55d7..aedcc3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -50,6 +50,7 @@ import javax.xml.parsers.ParserConfigurationException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetworkTopology; @@ -76,12 +77,14 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.Task; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -89,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -113,7 +117,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; @@ -5371,4 +5374,67 @@ public class TestFairScheduler extends FairSchedulerTestBase { resourceManager.getResourceScheduler().handle(nodeAddEvent1); return nm; } + + @Test(timeout = 120000) + public void testRefreshQueuesWhenRMHA() throws Exception { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); + conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + HAServiceProtocol.StateChangeRequestInfo requestInfo = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + // 1. start a standby RM, file 'ALLOC_FILE' is empty, so there is no queues + MockRM rm1 = new MockRM(conf, null); + rm1.init(conf); + rm1.start(); + rm1.getAdminService().transitionToStandby(requestInfo); + + // 2. add a new queue "test_queue" + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" 3"); + out.println(""); + out.println(""); + out.close(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // 3. start a active RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.init(conf); + rm2.start(); + + MockNM nm = + new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm.registerNode(); + + rm2.getAdminService().transitionToActive(requestInfo); + + // 4. submit a app to the new added queue "test_queue" + RMApp app = rm2.submitApp(200, "test_app", "user", null, "test_queue"); + RMAppAttempt attempt0 = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am0 = rm2.sendAMLaunched(attempt0.getAppAttemptId()); + am0.registerAppAttempt(); + assertEquals("root.test_queue", app.getQueue()); + + // 5. Transit rm1 to active, recover app + ((RMContextImpl) rm1.getRMContext()).setStateStore(memStore); + rm1.getAdminService().transitionToActive(requestInfo); + rm1.drainEvents(); + assertEquals(1, rm1.getRMContext().getRMApps().size()); + RMApp recoveredApp = + rm1.getRMContext().getRMApps().values().iterator().next(); + assertEquals("root.test_queue", recoveredApp.getQueue()); + + rm1.stop(); + rm2.stop(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org