Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CD836200CFA for ; Fri, 28 Jul 2017 01:48:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CC01616B456; Thu, 27 Jul 2017 23:48:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D044616B434 for ; Fri, 28 Jul 2017 01:48:37 +0200 (CEST) Received: (qmail 88692 invoked by uid 500); 27 Jul 2017 23:48:34 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 87915 invoked by uid 99); 27 Jul 2017 23:48:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Jul 2017 23:48:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6C4C4F330A; Thu, 27 Jul 2017 23:48:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Thu, 27 Jul 2017 23:48:42 -0000 Message-Id: In-Reply-To: <39b717879f5b448f8772d14f4311e814@git.apache.org> References: <39b717879f5b448f8772d14f4311e814@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/34] hadoop git commit: YARN-6102. RMActiveService context to be updated with new RMContext on failover. Contributed by Rohith Sharma K S. archived-at: Thu, 27 Jul 2017 23:48:40 -0000 YARN-6102. RMActiveService context to be updated with new RMContext on failover. Contributed by Rohith Sharma K S. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3153284 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3153284 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3153284 Branch: refs/heads/HDFS-7240 Commit: e3153284288d6cfa7a28511dfefe1c8a7d6b4eda Parents: 2054324 Author: Sunil G Authored: Mon Jul 24 10:59:01 2017 +0530 Committer: Sunil G Committed: Mon Jul 24 11:39:03 2017 +0530 ---------------------------------------------------------------------- ...ActiveStandbyElectorBasedElectorService.java | 12 +- .../server/resourcemanager/AdminService.java | 71 +++-- .../CuratorBasedElectorService.java | 10 +- .../resourcemanager/RMActiveServiceContext.java | 36 +-- .../server/resourcemanager/RMContextImpl.java | 312 ++++++++++--------- .../resourcemanager/RMServiceContext.java | 162 ++++++++++ .../server/resourcemanager/ResourceManager.java | 35 ++- .../metrics/TimelineServiceV2Publisher.java | 6 +- .../RMTimelineCollectorManager.java | 10 +- .../yarn/server/resourcemanager/MockRM.java | 2 +- .../resourcemanager/TestRMEmbeddedElector.java | 8 +- .../yarn/server/resourcemanager/TestRMHA.java | 16 +- .../TestSystemMetricsPublisherForV2.java | 13 +- 13 files changed, 451 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java index b59bc25..a8dcda4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java @@ -57,7 +57,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC); - private RMContext rmContext; + private ResourceManager rm; private byte[] localActiveNodeInfo; private ActiveStandbyElector elector; @@ -66,9 +66,9 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService @VisibleForTesting final Object zkDisconnectLock = new Object(); - ActiveStandbyElectorBasedElectorService(RMContext rmContext) { + ActiveStandbyElectorBasedElectorService(ResourceManager rm) { super(ActiveStandbyElectorBasedElectorService.class.getName()); - this.rmContext = rmContext; + this.rm = rm; } @Override @@ -140,7 +140,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService cancelDisconnectTimer(); try { - rmContext.getRMAdminService().transitionToActive(req); + rm.getRMContext().getRMAdminService().transitionToActive(req); } catch (Exception e) { throw new ServiceFailedException("RM could not transition to Active", e); } @@ -151,7 +151,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService cancelDisconnectTimer(); try { - rmContext.getRMAdminService().transitionToStandby(req); + rm.getRMContext().getRMAdminService().transitionToStandby(req); } catch (Exception e) { LOG.error("RM could not transition to Standby", e); } @@ -205,7 +205,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService @SuppressWarnings(value = "unchecked") @Override public void notifyFatalError(String errorMessage) { - rmContext.getDispatcher().getEventHandler().handle( + rm.getRMContext().getDispatcher().getEventHandler().handle( new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 7571765..3457ae3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -102,7 +102,6 @@ public class AdminService extends CompositeService implements private static final Log LOG = LogFactory.getLog(AdminService.class); - private final RMContext rmContext; private final ResourceManager rm; private String rmId; @@ -123,16 +122,16 @@ public class AdminService extends CompositeService implements @VisibleForTesting boolean isCentralizedNodeLabelConfiguration = true; - public AdminService(ResourceManager rm, RMContext rmContext) { + public AdminService(ResourceManager rm) { super(AdminService.class.getName()); this.rm = rm; - this.rmContext = rmContext; } @Override public void serviceInit(Configuration conf) throws Exception { autoFailoverEnabled = - rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf); + rm.getRMContext().isHAEnabled() + && HAUtil.isAutomaticFailoverEnabled(conf); masterServiceBindAddress = conf.getSocketAddr( YarnConfiguration.RM_BIND_HOST, @@ -189,7 +188,7 @@ public class AdminService extends CompositeService implements RMPolicyProvider.getInstance()); } - if (rmContext.isHAEnabled()) { + if (rm.getRMContext().isHAEnabled()) { RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, ProtobufRpcEngine.class); @@ -265,7 +264,7 @@ public class AdminService extends CompositeService implements } private synchronized boolean isRMActive() { - return HAServiceState.ACTIVE == rmContext.getHAServiceState(); + return HAServiceState.ACTIVE == rm.getRMContext().getHAServiceState(); } private void throwStandbyException() throws StandbyException { @@ -304,7 +303,7 @@ public class AdminService extends CompositeService implements // call all refresh*s for active RM to get the updated configurations. refreshAll(); } catch (Exception e) { - rmContext + rm.getRMContext() .getDispatcher() .getEventHandler() .handle( @@ -363,7 +362,7 @@ public class AdminService extends CompositeService implements @Override public synchronized HAServiceStatus getServiceStatus() throws IOException { checkAccess("getServiceState"); - HAServiceState haState = rmContext.getHAServiceState(); + HAServiceState haState = rm.getRMContext().getHAServiceState(); HAServiceStatus ret = new HAServiceStatus(haState); if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { ret.setReadyToBecomeActive(); @@ -395,11 +394,12 @@ public class AdminService extends CompositeService implements } private void refreshQueues() throws IOException, YarnException { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); + rm.getRMContext().getScheduler().reinitialize(getConfig(), + this.rm.getRMContext()); // refresh the reservation system - ReservationSystem rSystem = rmContext.getReservationSystem(); + ReservationSystem rSystem = rm.getRMContext().getReservationSystem(); if (rSystem != null) { - rSystem.reinitialize(getConfig(), rmContext); + rSystem.reinitialize(getConfig(), rm.getRMContext()); } } @@ -418,14 +418,14 @@ public class AdminService extends CompositeService implements YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); switch (request.getDecommissionType()) { case NORMAL: - rmContext.getNodesListManager().refreshNodes(conf); + rm.getRMContext().getNodesListManager().refreshNodes(conf); break; case GRACEFUL: - rmContext.getNodesListManager().refreshNodesGracefully( + rm.getRMContext().getNodesListManager().refreshNodesGracefully( conf, request.getDecommissionTimeout()); break; case FORCEFUL: - rmContext.getNodesListManager().refreshNodesForcefully(); + rm.getRMContext().getNodesListManager().refreshNodesForcefully(); break; } RMAuditLogger.logSuccess(user.getShortUserName(), operation, @@ -440,7 +440,7 @@ public class AdminService extends CompositeService implements Configuration conf = getConfiguration(new Configuration(false), YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - rmContext.getNodesListManager().refreshNodes(conf); + rm.getRMContext().getNodesListManager().refreshNodes(conf); } @Override @@ -559,10 +559,11 @@ public class AdminService extends CompositeService implements Configuration conf = getConfiguration(new Configuration(false), YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE); - rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider); - rmContext.getApplicationMasterService().refreshServiceAcls( + rm.getRMContext().getClientRMService().refreshServiceAcls(conf, + policyProvider); + rm.getRMContext().getApplicationMasterService().refreshServiceAcls( conf, policyProvider); - rmContext.getResourceTrackerService().refreshServiceAcls( + rm.getRMContext().getResourceTrackerService().refreshServiceAcls( conf, policyProvider); } @@ -601,7 +602,7 @@ public class AdminService extends CompositeService implements // if any invalid nodes, throw exception instead of partially updating // valid nodes. for (NodeId nodeId : nodeIds) { - RMNode node = this.rmContext.getRMNodes().get(nodeId); + RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId); if (node == null) { LOG.error("Resource update get failed on all nodes due to change " + "resource on an unrecognized node: " + nodeId); @@ -619,14 +620,14 @@ public class AdminService extends CompositeService implements for (Map.Entry entry : nodeResourceMap.entrySet()) { ResourceOption newResourceOption = entry.getValue(); NodeId nodeId = entry.getKey(); - RMNode node = this.rmContext.getRMNodes().get(nodeId); + RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId); if (node == null) { LOG.warn("Resource update get failed on an unrecognized node: " + nodeId); allSuccess = false; } else { // update resource to RMNode - this.rmContext.getDispatcher().getEventHandler() + this.rm.getRMContext().getDispatcher().getEventHandler() .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption)); LOG.info("Update resource on node(" + node.getNodeID() + ") with resource(" + newResourceOption.toString() + ")"); @@ -661,7 +662,8 @@ public class AdminService extends CompositeService implements DynamicResourceConfiguration newConf; InputStream drInputStream = - this.rmContext.getConfigurationProvider().getConfigurationInputStream( + this.rm.getRMContext().getConfigurationProvider() + .getConfigurationInputStream( configuration, YarnConfiguration.DR_CONFIGURATION_FILE); if (drInputStream != null) { @@ -679,7 +681,7 @@ public class AdminService extends CompositeService implements updateNodeResource(updateRequest); } // refresh dynamic resource in ResourceTrackerService - this.rmContext.getResourceTrackerService(). + this.rm.getRMContext().getResourceTrackerService(). updateDynamicResourceConfiguration(newConf); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -692,7 +694,8 @@ public class AdminService extends CompositeService implements private synchronized Configuration getConfiguration(Configuration conf, String... confFileNames) throws YarnException, IOException { for (String confFileName : confFileNames) { - InputStream confFileInputStream = this.rmContext.getConfigurationProvider() + InputStream confFileInputStream = + this.rm.getRMContext().getConfigurationProvider() .getConfigurationInputStream(conf, confFileName); if (confFileInputStream != null) { conf.addResource(confFileInputStream); @@ -746,7 +749,7 @@ public class AdminService extends CompositeService implements AddToClusterNodeLabelsResponse response = recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class); try { - rmContext.getNodeLabelManager() + rm.getRMContext().getNodeLabelManager() .addToCluserNodeLabels(request.getNodeLabels()); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -769,7 +772,8 @@ public class AdminService extends CompositeService implements RemoveFromClusterNodeLabelsResponse response = recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class); try { - rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels()); + rm.getRMContext().getNodeLabelManager() + .removeFromClusterNodeLabels(request.getNodeLabels()); RMAuditLogger .logSuccess(user.getShortUserName(), operation, "AdminService"); return response; @@ -805,19 +809,20 @@ public class AdminService extends CompositeService implements boolean isKnown = false; // both active and inactive nodes are recognized as known nodes if (requestedNode.getPort() != 0) { - if (rmContext.getRMNodes().containsKey(requestedNode) - || rmContext.getInactiveRMNodes().containsKey(requestedNode)) { + if (rm.getRMContext().getRMNodes().containsKey(requestedNode) || rm + .getRMContext().getInactiveRMNodes().containsKey(requestedNode)) { isKnown = true; } } else { - for (NodeId knownNode : rmContext.getRMNodes().keySet()) { + for (NodeId knownNode : rm.getRMContext().getRMNodes().keySet()) { if (knownNode.getHost().equals(requestedNode.getHost())) { isKnown = true; break; } } if (!isKnown) { - for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) { + for (NodeId knownNode : rm.getRMContext().getInactiveRMNodes() + .keySet()) { if (knownNode.getHost().equals(requestedNode.getHost())) { isKnown = true; break; @@ -841,7 +846,7 @@ public class AdminService extends CompositeService implements } } try { - rmContext.getNodeLabelManager().replaceLabelsOnNode( + rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode( request.getNodeToLabels()); RMAuditLogger .logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -878,7 +883,7 @@ public class AdminService extends CompositeService implements checkRMStatus(user.getShortUserName(), operation, msg); - Set decommissioningNodes = rmContext.getNodesListManager() + Set decommissioningNodes = rm.getRMContext().getNodesListManager() .checkForDecommissioningNodes(); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -914,6 +919,6 @@ public class AdminService extends CompositeService implements getConfiguration(new Configuration(false), YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - rmContext.getScheduler().setClusterMaxPriority(conf); + rm.getRMContext().getScheduler().setClusterMaxPriority(conf); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java index bcdf48b..d7485f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java @@ -45,14 +45,12 @@ public class CuratorBasedElectorService extends AbstractService LogFactory.getLog(CuratorBasedElectorService.class); private LeaderLatch leaderLatch; private CuratorFramework curator; - private RMContext rmContext; private String latchPath; private String rmId; private ResourceManager rm; - public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) { + public CuratorBasedElectorService(ResourceManager rm) { super(CuratorBasedElectorService.class.getName()); - this.rmContext = rmContext; this.rm = rm; } @@ -102,7 +100,8 @@ public class CuratorBasedElectorService extends AbstractService public void isLeader() { LOG.info(rmId + "is elected leader, transitioning to active"); try { - rmContext.getRMAdminService().transitionToActive( + rm.getRMContext().getRMAdminService() + .transitionToActive( new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); } catch (Exception e) { @@ -123,7 +122,8 @@ public class CuratorBasedElectorService extends AbstractService public void notLeader() { LOG.info(rmId + " relinquish leadership"); try { - rmContext.getRMAdminService().transitionToStandby( + rm.getRMContext().getRMAdminService() + .transitionToStandby( new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 0e305a9..9dc5945 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -42,20 +42,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; /** - * The RMActiveServiceContext is the class that maintains all the - * RMActiveService contexts.This is expected to be used only by ResourceManager - * and RMContext. + * The RMActiveServiceContext is the class that maintains Active service + * context. Services that need to run only on the Active RM. This is expected to + * be used only by RMContext. */ @Private @Unstable @@ -94,7 +94,6 @@ public class RMActiveServiceContext { private NodesListManager nodesListManager; private ResourceTrackerService resourceTrackerService; private ApplicationMasterService applicationMasterService; - private RMTimelineCollectorManager timelineCollectorManager; private RMNodeLabelsManager nodeLabelManager; private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater; @@ -107,6 +106,7 @@ public class RMActiveServiceContext { private PlacementManager queuePlacementManager = null; private RMAppLifetimeMonitor rmAppLifetimeMonitor; + private QueueLimitCalculator queueLimitCalculator; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -374,19 +374,6 @@ public class RMActiveServiceContext { @Private @Unstable - public RMTimelineCollectorManager getRMTimelineCollectorManager() { - return timelineCollectorManager; - } - - @Private - @Unstable - public void setRMTimelineCollectorManager( - RMTimelineCollectorManager collectorManager) { - this.timelineCollectorManager = collectorManager; - } - - @Private - @Unstable public long getEpoch() { return this.epoch; } @@ -483,4 +470,17 @@ public class RMActiveServiceContext { public RMAppLifetimeMonitor getRMAppLifetimeMonitor() { return this.rmAppLifetimeMonitor; } + + @Private + @Unstable + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return this.queueLimitCalculator; + } + + @Private + @Unstable + public void setContainerQueueLimitCalculator( + QueueLimitCalculator limitCalculator) { + this.queueLimitCalculator = limitCalculator; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index fb160c4..db2c585 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -57,37 +56,39 @@ import org.apache.hadoop.yarn.util.Clock; import com.google.common.annotations.VisibleForTesting; +/** + * RMContextImpl class holds two services context. + *
    + *
  • serviceContext : These services called as Always On services. + * Services that need to run always irrespective of the HA state of the RM.
  • + *
  • activeServiceCotext : Active services context. Services that need to run + * only on the Active RM.
  • + *
+ *

+ * Note: If any new service to be added to context, add it to a right + * context as per above description. + */ public class RMContextImpl implements RMContext { - private Dispatcher rmDispatcher; - - private boolean isHAEnabled; - - private HAServiceState haServiceState = - HAServiceProtocol.HAServiceState.INITIALIZING; - - private AdminService adminService; - - private ConfigurationProvider configurationProvider; + /** + * RM service contexts which runs through out RM life span. These are created + * once during start of RM. + */ + private RMServiceContext serviceContext; + /** + * RM Active service context. This will be recreated for every transition from + * ACTIVE->STANDBY. + */ private RMActiveServiceContext activeServiceContext; - private Configuration yarnConfiguration; - - private RMApplicationHistoryWriter rmApplicationHistoryWriter; - private SystemMetricsPublisher systemMetricsPublisher; - private EmbeddedElector elector; - - private QueueLimitCalculator queueLimitCalculator; - - private final Object haServiceStateLock = new Object(); - - private ResourceManager resourceManager; /** * Default constructor. To be used in conjunction with setter methods for * individual fields. */ public RMContextImpl() { + this.serviceContext = new RMServiceContext(); + this.activeServiceContext = new RMActiveServiceContext(); } @VisibleForTesting @@ -138,19 +139,154 @@ public class RMContextImpl implements RMContext { clientToAMTokenSecretManager, null); } + /** + * RM service contexts which runs through out JVM life span. These are created + * once during start of RM. + * @return serviceContext of RM + */ + @Private + @Unstable + public RMServiceContext getServiceContext() { + return serviceContext; + } + + /** + * Note: setting service context clears all services embedded with it. + * @param context rm service context + */ + @Private + @Unstable + public void setServiceContext(RMServiceContext context) { + this.serviceContext = context; + } + @Override - public Dispatcher getDispatcher() { - return this.rmDispatcher; + public ResourceManager getResourceManager() { + return serviceContext.getResourceManager(); + } + + public void setResourceManager(ResourceManager rm) { + serviceContext.setResourceManager(rm); + } + + @Override + public EmbeddedElector getLeaderElectorService() { + return serviceContext.getLeaderElectorService(); } @Override public void setLeaderElectorService(EmbeddedElector elector) { - this.elector = elector; + serviceContext.setLeaderElectorService(elector); } @Override - public EmbeddedElector getLeaderElectorService() { - return this.elector; + public Dispatcher getDispatcher() { + return serviceContext.getDispatcher(); + } + + void setDispatcher(Dispatcher dispatcher) { + serviceContext.setDispatcher(dispatcher); + } + + @Override + public AdminService getRMAdminService() { + return serviceContext.getRMAdminService(); + } + + void setRMAdminService(AdminService adminService) { + serviceContext.setRMAdminService(adminService); + } + + @Override + public boolean isHAEnabled() { + return serviceContext.isHAEnabled(); + } + + void setHAEnabled(boolean isHAEnabled) { + serviceContext.setHAEnabled(isHAEnabled); + } + + @Override + public HAServiceState getHAServiceState() { + return serviceContext.getHAServiceState(); + } + + void setHAServiceState(HAServiceState serviceState) { + serviceContext.setHAServiceState(serviceState); + } + + @Override + public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { + return serviceContext.getRMApplicationHistoryWriter(); + } + + @Override + public void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + serviceContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + } + + @Override + public SystemMetricsPublisher getSystemMetricsPublisher() { + return serviceContext.getSystemMetricsPublisher(); + } + + @Override + public void setSystemMetricsPublisher( + SystemMetricsPublisher metricsPublisher) { + serviceContext.setSystemMetricsPublisher(metricsPublisher); + } + + @Override + public RMTimelineCollectorManager getRMTimelineCollectorManager() { + return serviceContext.getRMTimelineCollectorManager(); + } + + @Override + public void setRMTimelineCollectorManager( + RMTimelineCollectorManager timelineCollectorManager) { + serviceContext.setRMTimelineCollectorManager(timelineCollectorManager); + } + + @Override + public ConfigurationProvider getConfigurationProvider() { + return serviceContext.getConfigurationProvider(); + } + + public void setConfigurationProvider( + ConfigurationProvider configurationProvider) { + serviceContext.setConfigurationProvider(configurationProvider); + } + + @Override + public Configuration getYarnConfiguration() { + return serviceContext.getYarnConfiguration(); + } + + public void setYarnConfiguration(Configuration yarnConfiguration) { + serviceContext.setYarnConfiguration(yarnConfiguration); + } + + public String getHAZookeeperConnectionState() { + return serviceContext.getHAZookeeperConnectionState(); + } + + // ========================================================================== + /** + * RM Active service context. This will be recreated for every transition from + * ACTIVE to STANDBY. + * @return activeServiceContext of active services + */ + @Private + @Unstable + public RMActiveServiceContext getActiveServiceContext() { + return activeServiceContext; + } + + @Private + @Unstable + void setActiveServiceContext(RMActiveServiceContext activeServiceContext) { + this.activeServiceContext = activeServiceContext; } @Override @@ -228,11 +364,6 @@ public class RMContextImpl implements RMContext { return activeServiceContext.getClientToAMTokenSecretManager(); } - @Override - public AdminService getRMAdminService() { - return this.adminService; - } - @VisibleForTesting public void setStateStore(RMStateStore store) { activeServiceContext.setStateStore(store); @@ -253,24 +384,6 @@ public class RMContextImpl implements RMContext { return activeServiceContext.getResourceTrackerService(); } - void setHAEnabled(boolean isHAEnabled) { - this.isHAEnabled = isHAEnabled; - } - - void setHAServiceState(HAServiceState serviceState) { - synchronized (haServiceStateLock) { - this.haServiceState = serviceState; - } - } - - void setDispatcher(Dispatcher dispatcher) { - this.rmDispatcher = dispatcher; - } - - void setRMAdminService(AdminService adminService) { - this.adminService = adminService; - } - @Override public void setClientRMService(ClientRMService clientRMService) { activeServiceContext.setClientRMService(clientRMService); @@ -348,18 +461,6 @@ public class RMContextImpl implements RMContext { activeServiceContext.setResourceTrackerService(resourceTrackerService); } - @Override - public boolean isHAEnabled() { - return isHAEnabled; - } - - @Override - public HAServiceState getHAServiceState() { - synchronized (haServiceStateLock) { - return haServiceState; - } - } - public void setWorkPreservingRecoveryEnabled(boolean enabled) { activeServiceContext.setWorkPreservingRecoveryEnabled(enabled); } @@ -369,50 +470,6 @@ public class RMContextImpl implements RMContext { return activeServiceContext.isWorkPreservingRecoveryEnabled(); } - @Override - public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { - return this.rmApplicationHistoryWriter; - } - - @Override - public void setRMTimelineCollectorManager( - RMTimelineCollectorManager timelineCollectorManager) { - activeServiceContext.setRMTimelineCollectorManager( - timelineCollectorManager); - } - - @Override - public RMTimelineCollectorManager getRMTimelineCollectorManager() { - return activeServiceContext.getRMTimelineCollectorManager(); - } - - @Override - public void setSystemMetricsPublisher( - SystemMetricsPublisher metricsPublisher) { - this.systemMetricsPublisher = metricsPublisher; - } - - @Override - public SystemMetricsPublisher getSystemMetricsPublisher() { - return this.systemMetricsPublisher; - } - - @Override - public void setRMApplicationHistoryWriter( - RMApplicationHistoryWriter rmApplicationHistoryWriter) { - this.rmApplicationHistoryWriter = rmApplicationHistoryWriter; - - } - - @Override - public ConfigurationProvider getConfigurationProvider() { - return this.configurationProvider; - } - - public void setConfigurationProvider( - ConfigurationProvider configurationProvider) { - this.configurationProvider = configurationProvider; - } @Override public long getEpoch() { @@ -463,27 +520,6 @@ public class RMContextImpl implements RMContext { return activeServiceContext.getSystemCredentialsForApps(); } - @Private - @Unstable - public RMActiveServiceContext getActiveServiceContext() { - return activeServiceContext; - } - - @Private - @Unstable - void setActiveServiceContext(RMActiveServiceContext activeServiceContext) { - this.activeServiceContext = activeServiceContext; - } - - @Override - public Configuration getYarnConfiguration() { - return this.yarnConfiguration; - } - - public void setYarnConfiguration(Configuration yarnConfiguration) { - this.yarnConfiguration=yarnConfiguration; - } - @Override public PlacementManager getQueuePlacementManager() { return this.activeServiceContext.getQueuePlacementManager(); @@ -496,12 +532,12 @@ public class RMContextImpl implements RMContext { @Override public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { - return this.queueLimitCalculator; + return activeServiceContext.getNodeManagerQueueLimitCalculator(); } public void setContainerQueueLimitCalculator( QueueLimitCalculator limitCalculator) { - this.queueLimitCalculator = limitCalculator; + activeServiceContext.setContainerQueueLimitCalculator(limitCalculator); } @Override @@ -515,21 +551,5 @@ public class RMContextImpl implements RMContext { return this.activeServiceContext.getRMAppLifetimeMonitor(); } - public String getHAZookeeperConnectionState() { - if (elector == null) { - return "Could not find leader elector. Verify both HA and automatic " + - "failover are enabled."; - } else { - return elector.getZookeeperConnectionState(); - } - } - - @Override - public ResourceManager getResourceManager() { - return resourceManager; - } - - public void setResourceManager(ResourceManager rm) { - this.resourceManager = rm; - } + // Note: Read java doc before adding any services over here. } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java new file mode 100644 index 0000000..45c6166 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.yarn.conf.ConfigurationProvider; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; + +/** + * RMServiceContext class maintains "Always On" services. Services that need to + * run always irrespective of the HA state of the RM. This is created during + * initialization of RMContextImpl. + *

+ * Note: If any services to be added in this class, make sure service + * will be running always irrespective of the HA state of the RM + */ +@Private +@Unstable +public class RMServiceContext { + + private Dispatcher rmDispatcher; + private boolean isHAEnabled; + private HAServiceState haServiceState = + HAServiceProtocol.HAServiceState.INITIALIZING; + private AdminService adminService; + private ConfigurationProvider configurationProvider; + private Configuration yarnConfiguration; + private RMApplicationHistoryWriter rmApplicationHistoryWriter; + private SystemMetricsPublisher systemMetricsPublisher; + private EmbeddedElector elector; + private final Object haServiceStateLock = new Object(); + private ResourceManager resourceManager; + private RMTimelineCollectorManager timelineCollectorManager; + + public ResourceManager getResourceManager() { + return resourceManager; + } + + public void setResourceManager(ResourceManager rm) { + this.resourceManager = rm; + } + + public ConfigurationProvider getConfigurationProvider() { + return this.configurationProvider; + } + + public void setConfigurationProvider( + ConfigurationProvider configurationProvider) { + this.configurationProvider = configurationProvider; + } + + public Dispatcher getDispatcher() { + return this.rmDispatcher; + } + + void setDispatcher(Dispatcher dispatcher) { + this.rmDispatcher = dispatcher; + } + + public EmbeddedElector getLeaderElectorService() { + return this.elector; + } + + public void setLeaderElectorService(EmbeddedElector embeddedElector) { + this.elector = embeddedElector; + } + + public AdminService getRMAdminService() { + return this.adminService; + } + + void setRMAdminService(AdminService service) { + this.adminService = service; + } + + void setHAEnabled(boolean rmHAEnabled) { + this.isHAEnabled = rmHAEnabled; + } + + public boolean isHAEnabled() { + return isHAEnabled; + } + + public HAServiceState getHAServiceState() { + synchronized (haServiceStateLock) { + return haServiceState; + } + } + + void setHAServiceState(HAServiceState serviceState) { + synchronized (haServiceStateLock) { + this.haServiceState = serviceState; + } + } + + public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { + return this.rmApplicationHistoryWriter; + } + + public void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter applicationHistoryWriter) { + this.rmApplicationHistoryWriter = applicationHistoryWriter; + } + + public void setSystemMetricsPublisher( + SystemMetricsPublisher metricsPublisher) { + this.systemMetricsPublisher = metricsPublisher; + } + + public SystemMetricsPublisher getSystemMetricsPublisher() { + return this.systemMetricsPublisher; + } + + public Configuration getYarnConfiguration() { + return this.yarnConfiguration; + } + + public void setYarnConfiguration(Configuration yarnConfiguration) { + this.yarnConfiguration = yarnConfiguration; + } + + public RMTimelineCollectorManager getRMTimelineCollectorManager() { + return timelineCollectorManager; + } + + public void setRMTimelineCollectorManager( + RMTimelineCollectorManager collectorManager) { + this.timelineCollectorManager = collectorManager; + } + + public String getHAZookeeperConnectionState() { + if (elector == null) { + return "Could not find leader elector. Verify both HA and automatic " + + "failover are enabled."; + } else { + return elector.getZookeeperConnectionState(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index f727f55..b63b60d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -115,7 +115,6 @@ import org.eclipse.jetty.webapp.WebAppContext; import com.google.common.annotations.VisibleForTesting; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; @@ -345,9 +344,9 @@ public class ResourceManager extends CompositeService implements Recoverable { YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); if (curatorEnabled) { this.curator = createAndStartCurator(conf); - elector = new CuratorBasedElectorService(rmContext, this); + elector = new CuratorBasedElectorService(this); } else { - elector = new ActiveStandbyElectorBasedElectorService(rmContext); + elector = new ActiveStandbyElectorBasedElectorService(this); } return elector; } @@ -497,7 +496,7 @@ public class ResourceManager extends CompositeService implements Recoverable { } private RMTimelineCollectorManager createRMTimelineCollectorManager() { - return new RMTimelineCollectorManager(rmContext); + return new RMTimelineCollectorManager(this); } protected SystemMetricsPublisher createSystemMetricsPublisher() { @@ -508,7 +507,8 @@ public class ResourceManager extends CompositeService implements Recoverable { // we're dealing with the v.2.x publisher LOG.info("system metrics publisher with the timeline service V2 is " + "configured"); - publisher = new TimelineServiceV2Publisher(rmContext); + publisher = new TimelineServiceV2Publisher( + rmContext.getRMTimelineCollectorManager()); } else { // we're dealing with the v.1.x publisher LOG.info("system metrics publisher with the timeline service V1 is " + @@ -560,7 +560,6 @@ public class ResourceManager extends CompositeService implements Recoverable { private ApplicationMasterLauncher applicationMasterLauncher; private ContainerAllocationExpirer containerAllocationExpirer; private ResourceManager rm; - private RMActiveServiceContext activeServiceContext; private boolean fromActive = false; private StandByTransitionRunnable standByTransitionRunnable; @@ -573,9 +572,6 @@ public class ResourceManager extends CompositeService implements Recoverable { protected void serviceInit(Configuration configuration) throws Exception { standByTransitionRunnable = new StandByTransitionRunnable(); - activeServiceContext = new RMActiveServiceContext(); - rmContext.setActiveServiceContext(activeServiceContext); - rmSecretManagerService = createRMSecretManagerService(); addService(rmSecretManagerService); @@ -1149,7 +1145,7 @@ public class ResourceManager extends CompositeService implements Recoverable { ClusterMetrics.destroy(); QueueMetrics.clearQueueMetrics(); if (initialize) { - resetDispatcher(); + resetRMContext(); createAndInitActiveServices(true); } } @@ -1294,7 +1290,7 @@ public class ResourceManager extends CompositeService implements Recoverable { } protected AdminService createAdminService() { - return new AdminService(this, rmContext); + return new AdminService(this); } protected RMSecretManagerService createRMSecretManagerService() { @@ -1417,17 +1413,24 @@ public class ResourceManager extends CompositeService implements Recoverable { return dispatcher; } - private void resetDispatcher() { + private void resetRMContext() { + RMContextImpl rmContextImpl = new RMContextImpl(); + // transfer service context to new RM service Context + rmContextImpl.setServiceContext(rmContext.getServiceContext()); + + // reset dispatcher Dispatcher dispatcher = setupDispatcher(); - ((Service)dispatcher).init(this.conf); - ((Service)dispatcher).start(); - removeService((Service)rmDispatcher); + ((Service) dispatcher).init(this.conf); + ((Service) dispatcher).start(); + removeService((Service) rmDispatcher); // Need to stop previous rmDispatcher before assigning new dispatcher // otherwise causes "AsyncDispatcher event handler" thread leak ((Service) rmDispatcher).stop(); rmDispatcher = dispatcher; addIfService(rmDispatcher); - rmContext.setDispatcher(rmDispatcher); + rmContextImpl.setDispatcher(dispatcher); + + rmContext = rmContextImpl; } private void setSchedulerRecoveryStartAndWaitTime(RMState state, http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java index a8bf6bd..a3a2ebc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; @@ -75,9 +74,10 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher { private RMTimelineCollectorManager rmTimelineCollectorManager; private boolean publishContainerEvents; - public TimelineServiceV2Publisher(RMContext rmContext) { + public TimelineServiceV2Publisher( + RMTimelineCollectorManager timelineCollectorManager) { super("TimelineserviceV2Publisher"); - rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager(); + rmTimelineCollectorManager = timelineCollectorManager; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java index 64c3749..c980458 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java @@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; @@ -41,16 +41,16 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager { private static final Log LOG = LogFactory.getLog(RMTimelineCollectorManager.class); - private RMContext rmContext; + private ResourceManager rm; - public RMTimelineCollectorManager(RMContext rmContext) { + public RMTimelineCollectorManager(ResourceManager resourceManager) { super(RMTimelineCollectorManager.class.getName()); - this.rmContext = rmContext; + this.rm = resourceManager; } @Override protected void doPostPut(ApplicationId appId, TimelineCollector collector) { - RMApp app = rmContext.getRMApps().get(appId); + RMApp app = rm.getRMContext().getRMApps().get(appId); if (app == null) { throw new YarnRuntimeException( "Unable to get the timeline collector context info for a " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 23009db..5a215e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -1055,7 +1055,7 @@ public class MockRM extends ResourceManager { @Override protected AdminService createAdminService() { - return new AdminService(this, getRMContext()) { + return new AdminService(this) { @Override protected void startServer() { // override to not start rpc handler http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index 1fe9bbe..140483a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -122,13 +122,15 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { throws IOException, InterruptedException { AdminService as = mock(AdminService.class); RMContext rc = mock(RMContext.class); + ResourceManager rm = mock(ResourceManager.class); Configuration myConf = new Configuration(conf); myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50); + when(rm.getRMContext()).thenReturn(rc); when(rc.getRMAdminService()).thenReturn(as); - ActiveStandbyElectorBasedElectorService - ees = new ActiveStandbyElectorBasedElectorService(rc); + ActiveStandbyElectorBasedElectorService ees = + new ActiveStandbyElectorBasedElectorService(rm); ees.init(myConf); ees.enterNeutralMode(); @@ -290,7 +292,7 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { @Override protected EmbeddedElector createEmbeddedElector() { - return new ActiveStandbyElectorBasedElectorService(getRMContext()) { + return new ActiveStandbyElectorBasedElectorService(this) { @Override public void becomeActive() throws ServiceFailedException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index f807217..ec6b1e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -71,6 +71,7 @@ public class TestRMHA { private Log LOG = LogFactory.getLog(TestRMHA.class); private Configuration configuration; private MockRM rm = null; + private MockNM nm = null; private RMApp app = null; private RMAppAttempt attempt = null; private static final String STATE_ERR = @@ -135,7 +136,7 @@ public class TestRMHA { try { rm.getNewAppId(); - rm.registerNode("127.0.0.1:1", 2048); + nm = rm.registerNode("127.0.0.1:1", 2048); app = rm.submitApp(1024); attempt = app.getCurrentAppAttempt(); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); @@ -551,6 +552,17 @@ public class TestRMHA { verifyClusterMetrics(1, 1, 1, 1, 2048, 1); assertEquals(1, rm.getRMContext().getRMNodes().size()); assertEquals(1, rm.getRMContext().getRMApps().size()); + Assert.assertNotNull("Node not registered", nm); + + rm.adminService.transitionToStandby(requestInfo); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + // race condition causes to register/node heartbeat node even after service + // is stopping/stopped. New RMContext is being created on every transition + // to standby, so metrics should be 0 which indicates new context reference + // has taken. + nm.registerNode(); + verifyClusterMetrics(0, 0, 0, 0, 0, 0); // 3. Create new RM rm = new MockRM(conf, memStore) { @@ -592,7 +604,7 @@ public class TestRMHA { rm = new MockRM(configuration) { @Override protected AdminService createAdminService() { - return new AdminService(this, getRMContext()) { + return new AdminService(this) { int counter = 0; @Override protected void setConfig(Configuration conf) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index 2d40c91..ec09945 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -98,10 +99,12 @@ public class TestSystemMetricsPublisherForV2 { new Path(testRootDir.getAbsolutePath()), true); } + ResourceManager rm = mock(ResourceManager.class); RMContext rmContext = mock(RMContext.class); rmAppsMapInContext = new ConcurrentHashMap(); when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext); - rmTimelineCollectorManager = new RMTimelineCollectorManager(rmContext); + when(rm.getRMContext()).thenReturn(rmContext); + rmTimelineCollectorManager = new RMTimelineCollectorManager(rm); when(rmContext.getRMTimelineCollectorManager()).thenReturn( rmTimelineCollectorManager); @@ -113,7 +116,8 @@ public class TestSystemMetricsPublisherForV2 { dispatcher.init(conf); dispatcher.start(); - metricsPublisher = new TimelineServiceV2Publisher(rmContext) { + metricsPublisher = + new TimelineServiceV2Publisher(rmTimelineCollectorManager) { @Override protected Dispatcher getDispatcher() { return dispatcher; @@ -162,7 +166,7 @@ public class TestSystemMetricsPublisherForV2 { public void testSystemMetricPublisherInitialization() { @SuppressWarnings("resource") TimelineServiceV2Publisher publisher = - new TimelineServiceV2Publisher(mock(RMContext.class)); + new TimelineServiceV2Publisher(mock(RMTimelineCollectorManager.class)); try { Configuration conf = getTimelineV2Conf(); conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED, @@ -174,7 +178,8 @@ public class TestSystemMetricsPublisherForV2 { publisher.stop(); - publisher = new TimelineServiceV2Publisher(mock(RMContext.class)); + publisher = new TimelineServiceV2Publisher( + mock(RMTimelineCollectorManager.class)); conf = getTimelineV2Conf(); publisher.init(conf); assertTrue("Expected to have registered event handlers and set ready to " --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org