hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [11/34] hadoop git commit: YARN-6102. RMActiveService context to be updated with new RMContext on failover. Contributed by Rohith Sharma K S.
Date Thu, 27 Jul 2017 23:48:42 GMT
YARN-6102. RMActiveService context to be updated with new RMContext on failover. Contributed by Rohith Sharma K S.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3153284
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3153284
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3153284

Branch: refs/heads/HDFS-7240
Commit: e3153284288d6cfa7a28511dfefe1c8a7d6b4eda
Parents: 2054324
Author: Sunil G <sunilg@apache.org>
Authored: Mon Jul 24 10:59:01 2017 +0530
Committer: Sunil G <sunilg@apache.org>
Committed: Mon Jul 24 11:39:03 2017 +0530

----------------------------------------------------------------------
 ...ActiveStandbyElectorBasedElectorService.java |  12 +-
 .../server/resourcemanager/AdminService.java    |  71 +++--
 .../CuratorBasedElectorService.java             |  10 +-
 .../resourcemanager/RMActiveServiceContext.java |  36 +--
 .../server/resourcemanager/RMContextImpl.java   | 312 ++++++++++---------
 .../resourcemanager/RMServiceContext.java       | 162 ++++++++++
 .../server/resourcemanager/ResourceManager.java |  35 ++-
 .../metrics/TimelineServiceV2Publisher.java     |   6 +-
 .../RMTimelineCollectorManager.java             |  10 +-
 .../yarn/server/resourcemanager/MockRM.java     |   2 +-
 .../resourcemanager/TestRMEmbeddedElector.java  |   8 +-
 .../yarn/server/resourcemanager/TestRMHA.java   |  16 +-
 .../TestSystemMetricsPublisherForV2.java        |  13 +-
 13 files changed, 451 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
index b59bc25..a8dcda4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
@@ -57,7 +57,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
       new HAServiceProtocol.StateChangeRequestInfo(
           HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
 
-  private RMContext rmContext;
+  private ResourceManager rm;
 
   private byte[] localActiveNodeInfo;
   private ActiveStandbyElector elector;
@@ -66,9 +66,9 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
   @VisibleForTesting
   final Object zkDisconnectLock = new Object();
 
-  ActiveStandbyElectorBasedElectorService(RMContext rmContext) {
+  ActiveStandbyElectorBasedElectorService(ResourceManager rm) {
     super(ActiveStandbyElectorBasedElectorService.class.getName());
-    this.rmContext = rmContext;
+    this.rm = rm;
   }
 
   @Override
@@ -140,7 +140,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
     cancelDisconnectTimer();
 
     try {
-      rmContext.getRMAdminService().transitionToActive(req);
+      rm.getRMContext().getRMAdminService().transitionToActive(req);
     } catch (Exception e) {
       throw new ServiceFailedException("RM could not transition to Active", e);
     }
@@ -151,7 +151,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
     cancelDisconnectTimer();
 
     try {
-      rmContext.getRMAdminService().transitionToStandby(req);
+      rm.getRMContext().getRMAdminService().transitionToStandby(req);
     } catch (Exception e) {
       LOG.error("RM could not transition to Standby", e);
     }
@@ -205,7 +205,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
   @SuppressWarnings(value = "unchecked")
   @Override
   public void notifyFatalError(String errorMessage) {
-    rmContext.getDispatcher().getEventHandler().handle(
+    rm.getRMContext().getDispatcher().getEventHandler().handle(
         new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
             errorMessage));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 7571765..3457ae3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -102,7 +102,6 @@ public class AdminService extends CompositeService implements
 
   private static final Log LOG = LogFactory.getLog(AdminService.class);
 
-  private final RMContext rmContext;
   private final ResourceManager rm;
   private String rmId;
 
@@ -123,16 +122,16 @@ public class AdminService extends CompositeService implements
   @VisibleForTesting
   boolean isCentralizedNodeLabelConfiguration = true;
 
-  public AdminService(ResourceManager rm, RMContext rmContext) {
+  public AdminService(ResourceManager rm) {
     super(AdminService.class.getName());
     this.rm = rm;
-    this.rmContext = rmContext;
   }
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     autoFailoverEnabled =
-        rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf);
+        rm.getRMContext().isHAEnabled()
+            && HAUtil.isAutomaticFailoverEnabled(conf);
 
     masterServiceBindAddress = conf.getSocketAddr(
         YarnConfiguration.RM_BIND_HOST,
@@ -189,7 +188,7 @@ public class AdminService extends CompositeService implements
           RMPolicyProvider.getInstance());
     }
 
-    if (rmContext.isHAEnabled()) {
+    if (rm.getRMContext().isHAEnabled()) {
       RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
           ProtobufRpcEngine.class);
 
@@ -265,7 +264,7 @@ public class AdminService extends CompositeService implements
   }
 
   private synchronized boolean isRMActive() {
-    return HAServiceState.ACTIVE == rmContext.getHAServiceState();
+    return HAServiceState.ACTIVE == rm.getRMContext().getHAServiceState();
   }
 
   private void throwStandbyException() throws StandbyException {
@@ -304,7 +303,7 @@ public class AdminService extends CompositeService implements
       // call all refresh*s for active RM to get the updated configurations.
       refreshAll();
     } catch (Exception e) {
-      rmContext
+      rm.getRMContext()
           .getDispatcher()
           .getEventHandler()
           .handle(
@@ -363,7 +362,7 @@ public class AdminService extends CompositeService implements
   @Override
   public synchronized HAServiceStatus getServiceStatus() throws IOException {
     checkAccess("getServiceState");
-    HAServiceState haState = rmContext.getHAServiceState();
+    HAServiceState haState = rm.getRMContext().getHAServiceState();
     HAServiceStatus ret = new HAServiceStatus(haState);
     if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
       ret.setReadyToBecomeActive();
@@ -395,11 +394,12 @@ public class AdminService extends CompositeService implements
   }
 
   private void refreshQueues() throws IOException, YarnException {
-    rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
+    rm.getRMContext().getScheduler().reinitialize(getConfig(),
+        this.rm.getRMContext());
     // refresh the reservation system
-    ReservationSystem rSystem = rmContext.getReservationSystem();
+    ReservationSystem rSystem = rm.getRMContext().getReservationSystem();
     if (rSystem != null) {
-      rSystem.reinitialize(getConfig(), rmContext);
+      rSystem.reinitialize(getConfig(), rm.getRMContext());
     }
   }
 
@@ -418,14 +418,14 @@ public class AdminService extends CompositeService implements
               YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
       switch (request.getDecommissionType()) {
       case NORMAL:
-        rmContext.getNodesListManager().refreshNodes(conf);
+        rm.getRMContext().getNodesListManager().refreshNodes(conf);
         break;
       case GRACEFUL:
-        rmContext.getNodesListManager().refreshNodesGracefully(
+        rm.getRMContext().getNodesListManager().refreshNodesGracefully(
             conf, request.getDecommissionTimeout());
         break;
       case FORCEFUL:
-        rmContext.getNodesListManager().refreshNodesForcefully();
+        rm.getRMContext().getNodesListManager().refreshNodesForcefully();
         break;
       }
       RMAuditLogger.logSuccess(user.getShortUserName(), operation,
@@ -440,7 +440,7 @@ public class AdminService extends CompositeService implements
     Configuration conf =
         getConfiguration(new Configuration(false),
             YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
-    rmContext.getNodesListManager().refreshNodes(conf);
+    rm.getRMContext().getNodesListManager().refreshNodes(conf);
   }
 
   @Override
@@ -559,10 +559,11 @@ public class AdminService extends CompositeService implements
     Configuration conf =
         getConfiguration(new Configuration(false),
             YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
-    rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
-    rmContext.getApplicationMasterService().refreshServiceAcls(
+    rm.getRMContext().getClientRMService().refreshServiceAcls(conf,
+        policyProvider);
+    rm.getRMContext().getApplicationMasterService().refreshServiceAcls(
         conf, policyProvider);
-    rmContext.getResourceTrackerService().refreshServiceAcls(
+    rm.getRMContext().getResourceTrackerService().refreshServiceAcls(
         conf, policyProvider);
   }
 
@@ -601,7 +602,7 @@ public class AdminService extends CompositeService implements
     // if any invalid nodes, throw exception instead of partially updating
     // valid nodes.
     for (NodeId nodeId : nodeIds) {
-      RMNode node = this.rmContext.getRMNodes().get(nodeId);
+      RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);
       if (node == null) {
         LOG.error("Resource update get failed on all nodes due to change "
             + "resource on an unrecognized node: " + nodeId);
@@ -619,14 +620,14 @@ public class AdminService extends CompositeService implements
     for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) {
       ResourceOption newResourceOption = entry.getValue();
       NodeId nodeId = entry.getKey();
-      RMNode node = this.rmContext.getRMNodes().get(nodeId);
+      RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);
 
       if (node == null) {
         LOG.warn("Resource update get failed on an unrecognized node: " + nodeId);
         allSuccess = false;
       } else {
         // update resource to RMNode
-        this.rmContext.getDispatcher().getEventHandler()
+        this.rm.getRMContext().getDispatcher().getEventHandler()
           .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption));
         LOG.info("Update resource on node(" + node.getNodeID()
             + ") with resource(" + newResourceOption.toString() + ")");
@@ -661,7 +662,8 @@ public class AdminService extends CompositeService implements
       DynamicResourceConfiguration newConf;
 
       InputStream drInputStream =
-          this.rmContext.getConfigurationProvider().getConfigurationInputStream(
+          this.rm.getRMContext().getConfigurationProvider()
+              .getConfigurationInputStream(
               configuration, YarnConfiguration.DR_CONFIGURATION_FILE);
 
       if (drInputStream != null) {
@@ -679,7 +681,7 @@ public class AdminService extends CompositeService implements
         updateNodeResource(updateRequest);
       }
       // refresh dynamic resource in ResourceTrackerService
-      this.rmContext.getResourceTrackerService().
+      this.rm.getRMContext().getResourceTrackerService().
           updateDynamicResourceConfiguration(newConf);
       RMAuditLogger.logSuccess(user.getShortUserName(), operation,
               "AdminService");
@@ -692,7 +694,8 @@ public class AdminService extends CompositeService implements
   private synchronized Configuration getConfiguration(Configuration conf,
       String... confFileNames) throws YarnException, IOException {
     for (String confFileName : confFileNames) {
-      InputStream confFileInputStream = this.rmContext.getConfigurationProvider()
+      InputStream confFileInputStream =
+          this.rm.getRMContext().getConfigurationProvider()
           .getConfigurationInputStream(conf, confFileName);
       if (confFileInputStream != null) {
         conf.addResource(confFileInputStream);
@@ -746,7 +749,7 @@ public class AdminService extends CompositeService implements
     AddToClusterNodeLabelsResponse response =
         recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class);
     try {
-      rmContext.getNodeLabelManager()
+      rm.getRMContext().getNodeLabelManager()
           .addToCluserNodeLabels(request.getNodeLabels());
       RMAuditLogger.logSuccess(user.getShortUserName(), operation,
           "AdminService");
@@ -769,7 +772,8 @@ public class AdminService extends CompositeService implements
     RemoveFromClusterNodeLabelsResponse response =
         recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class);
     try {
-      rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels());
+      rm.getRMContext().getNodeLabelManager()
+          .removeFromClusterNodeLabels(request.getNodeLabels());
       RMAuditLogger
           .logSuccess(user.getShortUserName(), operation, "AdminService");
       return response;
@@ -805,19 +809,20 @@ public class AdminService extends CompositeService implements
         boolean isKnown = false;
         // both active and inactive nodes are recognized as known nodes
         if (requestedNode.getPort() != 0) {
-          if (rmContext.getRMNodes().containsKey(requestedNode)
-              || rmContext.getInactiveRMNodes().containsKey(requestedNode)) {
+          if (rm.getRMContext().getRMNodes().containsKey(requestedNode) || rm
+              .getRMContext().getInactiveRMNodes().containsKey(requestedNode)) {
             isKnown = true;
           }
         } else {
-          for (NodeId knownNode : rmContext.getRMNodes().keySet()) {
+          for (NodeId knownNode : rm.getRMContext().getRMNodes().keySet()) {
             if (knownNode.getHost().equals(requestedNode.getHost())) {
               isKnown = true;
               break;
             }
           }
           if (!isKnown) {
-            for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) {
+            for (NodeId knownNode : rm.getRMContext().getInactiveRMNodes()
+                .keySet()) {
               if (knownNode.getHost().equals(requestedNode.getHost())) {
                 isKnown = true;
                 break;
@@ -841,7 +846,7 @@ public class AdminService extends CompositeService implements
       }
     }
     try {
-      rmContext.getNodeLabelManager().replaceLabelsOnNode(
+      rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(
           request.getNodeToLabels());
       RMAuditLogger
           .logSuccess(user.getShortUserName(), operation, "AdminService");
@@ -878,7 +883,7 @@ public class AdminService extends CompositeService implements
 
     checkRMStatus(user.getShortUserName(), operation, msg);
 
-    Set<NodeId> decommissioningNodes = rmContext.getNodesListManager()
+    Set<NodeId> decommissioningNodes = rm.getRMContext().getNodesListManager()
         .checkForDecommissioningNodes();
     RMAuditLogger.logSuccess(user.getShortUserName(), operation,
             "AdminService");
@@ -914,6 +919,6 @@ public class AdminService extends CompositeService implements
         getConfiguration(new Configuration(false),
             YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
 
-    rmContext.getScheduler().setClusterMaxPriority(conf);
+    rm.getRMContext().getScheduler().setClusterMaxPriority(conf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
index bcdf48b..d7485f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
@@ -45,14 +45,12 @@ public class CuratorBasedElectorService extends AbstractService
       LogFactory.getLog(CuratorBasedElectorService.class);
   private LeaderLatch leaderLatch;
   private CuratorFramework curator;
-  private RMContext rmContext;
   private String latchPath;
   private String rmId;
   private ResourceManager rm;
 
-  public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) {
+  public CuratorBasedElectorService(ResourceManager rm) {
     super(CuratorBasedElectorService.class.getName());
-    this.rmContext = rmContext;
     this.rm = rm;
   }
 
@@ -102,7 +100,8 @@ public class CuratorBasedElectorService extends AbstractService
   public void isLeader() {
     LOG.info(rmId + "is elected leader, transitioning to active");
     try {
-      rmContext.getRMAdminService().transitionToActive(
+      rm.getRMContext().getRMAdminService()
+          .transitionToActive(
           new HAServiceProtocol.StateChangeRequestInfo(
               HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
     } catch (Exception e) {
@@ -123,7 +122,8 @@ public class CuratorBasedElectorService extends AbstractService
   public void notLeader() {
     LOG.info(rmId + " relinquish leadership");
     try {
-      rmContext.getRMAdminService().transitionToStandby(
+      rm.getRMContext().getRMAdminService()
+          .transitionToStandby(
           new HAServiceProtocol.StateChangeRequestInfo(
               HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 0e305a9..9dc5945 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -42,20 +42,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
 /**
- * The RMActiveServiceContext is the class that maintains all the
- * RMActiveService contexts.This is expected to be used only by ResourceManager
- * and RMContext.
+ * The RMActiveServiceContext is the class that maintains <b>Active</b> service
+ * context. Services that need to run only on the Active RM. This is expected to
+ * be used only by RMContext.
  */
 @Private
 @Unstable
@@ -94,7 +94,6 @@ public class RMActiveServiceContext {
   private NodesListManager nodesListManager;
   private ResourceTrackerService resourceTrackerService;
   private ApplicationMasterService applicationMasterService;
-  private RMTimelineCollectorManager timelineCollectorManager;
 
   private RMNodeLabelsManager nodeLabelManager;
   private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
@@ -107,6 +106,7 @@ public class RMActiveServiceContext {
   private PlacementManager queuePlacementManager = null;
 
   private RMAppLifetimeMonitor rmAppLifetimeMonitor;
+  private QueueLimitCalculator queueLimitCalculator;
 
   public RMActiveServiceContext() {
     queuePlacementManager = new PlacementManager();
@@ -374,19 +374,6 @@ public class RMActiveServiceContext {
 
   @Private
   @Unstable
-  public RMTimelineCollectorManager getRMTimelineCollectorManager() {
-    return timelineCollectorManager;
-  }
-
-  @Private
-  @Unstable
-  public void setRMTimelineCollectorManager(
-      RMTimelineCollectorManager collectorManager) {
-    this.timelineCollectorManager = collectorManager;
-  }
-
-  @Private
-  @Unstable
   public long getEpoch() {
     return this.epoch;
   }
@@ -483,4 +470,17 @@ public class RMActiveServiceContext {
   public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
     return this.rmAppLifetimeMonitor;
   }
+
+  @Private
+  @Unstable
+  public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
+    return this.queueLimitCalculator;
+  }
+
+  @Private
+  @Unstable
+  public void setContainerQueueLimitCalculator(
+      QueueLimitCalculator limitCalculator) {
+    this.queueLimitCalculator = limitCalculator;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index fb160c4..db2c585 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -57,37 +56,39 @@ import org.apache.hadoop.yarn.util.Clock;
 
 import com.google.common.annotations.VisibleForTesting;
 
+/**
+ * RMContextImpl class holds two services context.
+ * <ul>
+ * <li>serviceContext : These services called as <b>Always On</b> services.
+ * Services that need to run always irrespective of the HA state of the RM.</li>
+ * <li>activeServiceCotext : Active services context. Services that need to run
+ * only on the Active RM.</li>
+ * </ul>
+ * <p>
+ * <b>Note:</b> If any new service to be added to context, add it to a right
+ * context as per above description.
+ */
 public class RMContextImpl implements RMContext {
 
-  private Dispatcher rmDispatcher;
-
-  private boolean isHAEnabled;
-
-  private HAServiceState haServiceState =
-      HAServiceProtocol.HAServiceState.INITIALIZING;
-
-  private AdminService adminService;
-
-  private ConfigurationProvider configurationProvider;
+  /**
+   * RM service contexts which runs through out RM life span. These are created
+   * once during start of RM.
+   */
+  private RMServiceContext serviceContext;
 
+  /**
+   * RM Active service context. This will be recreated for every transition from
+   * ACTIVE->STANDBY.
+   */
   private RMActiveServiceContext activeServiceContext;
 
-  private Configuration yarnConfiguration;
-
-  private RMApplicationHistoryWriter rmApplicationHistoryWriter;
-  private SystemMetricsPublisher systemMetricsPublisher;
-  private EmbeddedElector elector;
-
-  private QueueLimitCalculator queueLimitCalculator;
-
-  private final Object haServiceStateLock = new Object();
-
-  private ResourceManager resourceManager;
   /**
    * Default constructor. To be used in conjunction with setter methods for
    * individual fields.
    */
   public RMContextImpl() {
+    this.serviceContext = new RMServiceContext();
+    this.activeServiceContext = new RMActiveServiceContext();
   }
 
   @VisibleForTesting
@@ -138,19 +139,154 @@ public class RMContextImpl implements RMContext {
       clientToAMTokenSecretManager, null);
   }
 
+  /**
+   * RM service contexts which runs through out JVM life span. These are created
+   * once during start of RM.
+   * @return serviceContext of RM
+   */
+  @Private
+  @Unstable
+  public RMServiceContext getServiceContext() {
+    return serviceContext;
+  }
+
+  /**
+   * <b>Note:</b> setting service context clears all services embedded with it.
+   * @param context rm service context
+   */
+  @Private
+  @Unstable
+  public void setServiceContext(RMServiceContext context) {
+    this.serviceContext = context;
+  }
+
   @Override
-  public Dispatcher getDispatcher() {
-    return this.rmDispatcher;
+  public ResourceManager getResourceManager() {
+    return serviceContext.getResourceManager();
+  }
+
+  public void setResourceManager(ResourceManager rm) {
+    serviceContext.setResourceManager(rm);
+  }
+
+  @Override
+  public EmbeddedElector getLeaderElectorService() {
+    return serviceContext.getLeaderElectorService();
   }
 
   @Override
   public void setLeaderElectorService(EmbeddedElector elector) {
-    this.elector = elector;
+    serviceContext.setLeaderElectorService(elector);
   }
 
   @Override
-  public EmbeddedElector getLeaderElectorService() {
-    return this.elector;
+  public Dispatcher getDispatcher() {
+    return serviceContext.getDispatcher();
+  }
+
+  void setDispatcher(Dispatcher dispatcher) {
+    serviceContext.setDispatcher(dispatcher);
+  }
+
+  @Override
+  public AdminService getRMAdminService() {
+    return serviceContext.getRMAdminService();
+  }
+
+  void setRMAdminService(AdminService adminService) {
+    serviceContext.setRMAdminService(adminService);
+  }
+
+  @Override
+  public boolean isHAEnabled() {
+    return serviceContext.isHAEnabled();
+  }
+
+  void setHAEnabled(boolean isHAEnabled) {
+    serviceContext.setHAEnabled(isHAEnabled);
+  }
+
+  @Override
+  public HAServiceState getHAServiceState() {
+    return serviceContext.getHAServiceState();
+  }
+
+  void setHAServiceState(HAServiceState serviceState) {
+    serviceContext.setHAServiceState(serviceState);
+  }
+
+  @Override
+  public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
+    return serviceContext.getRMApplicationHistoryWriter();
+  }
+
+  @Override
+  public void setRMApplicationHistoryWriter(
+      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+    serviceContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+  }
+
+  @Override
+  public SystemMetricsPublisher getSystemMetricsPublisher() {
+    return serviceContext.getSystemMetricsPublisher();
+  }
+
+  @Override
+  public void setSystemMetricsPublisher(
+      SystemMetricsPublisher metricsPublisher) {
+    serviceContext.setSystemMetricsPublisher(metricsPublisher);
+  }
+
+  @Override
+  public RMTimelineCollectorManager getRMTimelineCollectorManager() {
+    return serviceContext.getRMTimelineCollectorManager();
+  }
+
+  @Override
+  public void setRMTimelineCollectorManager(
+      RMTimelineCollectorManager timelineCollectorManager) {
+    serviceContext.setRMTimelineCollectorManager(timelineCollectorManager);
+  }
+
+  @Override
+  public ConfigurationProvider getConfigurationProvider() {
+    return serviceContext.getConfigurationProvider();
+  }
+
+  public void setConfigurationProvider(
+      ConfigurationProvider configurationProvider) {
+    serviceContext.setConfigurationProvider(configurationProvider);
+  }
+
+  @Override
+  public Configuration getYarnConfiguration() {
+    return serviceContext.getYarnConfiguration();
+  }
+
+  public void setYarnConfiguration(Configuration yarnConfiguration) {
+    serviceContext.setYarnConfiguration(yarnConfiguration);
+  }
+
+  public String getHAZookeeperConnectionState() {
+    return serviceContext.getHAZookeeperConnectionState();
+  }
+
+  // ==========================================================================
+  /**
+   * RM Active service context. This will be recreated for every transition from
+   * ACTIVE to STANDBY.
+   * @return activeServiceContext of active services
+   */
+  @Private
+  @Unstable
+  public RMActiveServiceContext getActiveServiceContext() {
+    return activeServiceContext;
+  }
+
+  @Private
+  @Unstable
+  void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
+    this.activeServiceContext = activeServiceContext;
   }
 
   @Override
@@ -228,11 +364,6 @@ public class RMContextImpl implements RMContext {
     return activeServiceContext.getClientToAMTokenSecretManager();
   }
 
-  @Override
-  public AdminService getRMAdminService() {
-    return this.adminService;
-  }
-
   @VisibleForTesting
   public void setStateStore(RMStateStore store) {
     activeServiceContext.setStateStore(store);
@@ -253,24 +384,6 @@ public class RMContextImpl implements RMContext {
     return activeServiceContext.getResourceTrackerService();
   }
 
-  void setHAEnabled(boolean isHAEnabled) {
-    this.isHAEnabled = isHAEnabled;
-  }
-
-  void setHAServiceState(HAServiceState serviceState) {
-    synchronized (haServiceStateLock) {
-      this.haServiceState = serviceState;
-    }
-  }
-
-  void setDispatcher(Dispatcher dispatcher) {
-    this.rmDispatcher = dispatcher;
-  }
-
-  void setRMAdminService(AdminService adminService) {
-    this.adminService = adminService;
-  }
-
   @Override
   public void setClientRMService(ClientRMService clientRMService) {
     activeServiceContext.setClientRMService(clientRMService);
@@ -348,18 +461,6 @@ public class RMContextImpl implements RMContext {
     activeServiceContext.setResourceTrackerService(resourceTrackerService);
   }
 
-  @Override
-  public boolean isHAEnabled() {
-    return isHAEnabled;
-  }
-
-  @Override
-  public HAServiceState getHAServiceState() {
-    synchronized (haServiceStateLock) {
-      return haServiceState;
-    }
-  }
-
   public void setWorkPreservingRecoveryEnabled(boolean enabled) {
     activeServiceContext.setWorkPreservingRecoveryEnabled(enabled);
   }
@@ -369,50 +470,6 @@ public class RMContextImpl implements RMContext {
     return activeServiceContext.isWorkPreservingRecoveryEnabled();
   }
 
-  @Override
-  public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
-    return this.rmApplicationHistoryWriter;
-  }
-
-  @Override
-  public void setRMTimelineCollectorManager(
-      RMTimelineCollectorManager timelineCollectorManager) {
-    activeServiceContext.setRMTimelineCollectorManager(
-        timelineCollectorManager);
-  }
-
-  @Override
-  public RMTimelineCollectorManager getRMTimelineCollectorManager() {
-    return activeServiceContext.getRMTimelineCollectorManager();
-  }
-
-  @Override
-  public void setSystemMetricsPublisher(
-      SystemMetricsPublisher metricsPublisher) {
-    this.systemMetricsPublisher = metricsPublisher;
-  }
-
-  @Override
-  public SystemMetricsPublisher getSystemMetricsPublisher() {
-    return this.systemMetricsPublisher;
-  }
-
-  @Override
-  public void setRMApplicationHistoryWriter(
-      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
-    this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
-
-  }
-
-  @Override
-  public ConfigurationProvider getConfigurationProvider() {
-    return this.configurationProvider;
-  }
-
-  public void setConfigurationProvider(
-      ConfigurationProvider configurationProvider) {
-    this.configurationProvider = configurationProvider;
-  }
 
   @Override
   public long getEpoch() {
@@ -463,27 +520,6 @@ public class RMContextImpl implements RMContext {
     return activeServiceContext.getSystemCredentialsForApps();
   }
 
-  @Private
-  @Unstable
-  public RMActiveServiceContext getActiveServiceContext() {
-    return activeServiceContext;
-  }
-
-  @Private
-  @Unstable
-  void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
-    this.activeServiceContext = activeServiceContext;
-  }
-
-  @Override
-  public Configuration getYarnConfiguration() {
-    return this.yarnConfiguration;
-  }
-
-  public void setYarnConfiguration(Configuration yarnConfiguration) {
-    this.yarnConfiguration=yarnConfiguration;
-  }
-
   @Override
   public PlacementManager getQueuePlacementManager() {
     return this.activeServiceContext.getQueuePlacementManager();
@@ -496,12 +532,12 @@ public class RMContextImpl implements RMContext {
 
   @Override
   public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
-    return this.queueLimitCalculator;
+    return activeServiceContext.getNodeManagerQueueLimitCalculator();
   }
 
   public void setContainerQueueLimitCalculator(
       QueueLimitCalculator limitCalculator) {
-    this.queueLimitCalculator = limitCalculator;
+    activeServiceContext.setContainerQueueLimitCalculator(limitCalculator);
   }
 
   @Override
@@ -515,21 +551,5 @@ public class RMContextImpl implements RMContext {
     return this.activeServiceContext.getRMAppLifetimeMonitor();
   }
 
-  public String getHAZookeeperConnectionState() {
-    if (elector == null) {
-      return "Could not find leader elector. Verify both HA and automatic " +
-          "failover are enabled.";
-    } else {
-      return elector.getZookeeperConnectionState();
-    }
-  }
-
-  @Override
-  public ResourceManager getResourceManager() {
-    return resourceManager;
-  }
-
-  public void setResourceManager(ResourceManager rm) {
-    this.resourceManager = rm;
-  }
+  // Note: Read java doc before adding any services over here.
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java
new file mode 100644
index 0000000..45c6166
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.yarn.conf.ConfigurationProvider;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+
+/**
+ * RMServiceContext class maintains "Always On" services. Services that need to
+ * run always irrespective of the HA state of the RM. This is created during
+ * initialization of RMContextImpl.
+ * <p>
+ * <b>Note:</b> If any services to be added in this class, make sure service
+ * will be running always irrespective of the HA state of the RM
+ */
+@Private
+@Unstable
+public class RMServiceContext {
+
+  private Dispatcher rmDispatcher;
+  private boolean isHAEnabled;
+  private HAServiceState haServiceState =
+      HAServiceProtocol.HAServiceState.INITIALIZING;
+  private AdminService adminService;
+  private ConfigurationProvider configurationProvider;
+  private Configuration yarnConfiguration;
+  private RMApplicationHistoryWriter rmApplicationHistoryWriter;
+  private SystemMetricsPublisher systemMetricsPublisher;
+  private EmbeddedElector elector;
+  private final Object haServiceStateLock = new Object();
+  private ResourceManager resourceManager;
+  private RMTimelineCollectorManager timelineCollectorManager;
+
+  public ResourceManager getResourceManager() {
+    return resourceManager;
+  }
+
+  public void setResourceManager(ResourceManager rm) {
+    this.resourceManager = rm;
+  }
+
+  public ConfigurationProvider getConfigurationProvider() {
+    return this.configurationProvider;
+  }
+
+  public void setConfigurationProvider(
+      ConfigurationProvider configurationProvider) {
+    this.configurationProvider = configurationProvider;
+  }
+
+  public Dispatcher getDispatcher() {
+    return this.rmDispatcher;
+  }
+
+  void setDispatcher(Dispatcher dispatcher) {
+    this.rmDispatcher = dispatcher;
+  }
+
+  public EmbeddedElector getLeaderElectorService() {
+    return this.elector;
+  }
+
+  public void setLeaderElectorService(EmbeddedElector embeddedElector) {
+    this.elector = embeddedElector;
+  }
+
+  public AdminService getRMAdminService() {
+    return this.adminService;
+  }
+
+  void setRMAdminService(AdminService service) {
+    this.adminService = service;
+  }
+
+  void setHAEnabled(boolean rmHAEnabled) {
+    this.isHAEnabled = rmHAEnabled;
+  }
+
+  public boolean isHAEnabled() {
+    return isHAEnabled;
+  }
+
+  public HAServiceState getHAServiceState() {
+    synchronized (haServiceStateLock) {
+      return haServiceState;
+    }
+  }
+
+  void setHAServiceState(HAServiceState serviceState) {
+    synchronized (haServiceStateLock) {
+      this.haServiceState = serviceState;
+    }
+  }
+
+  public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
+    return this.rmApplicationHistoryWriter;
+  }
+
+  public void setRMApplicationHistoryWriter(
+      RMApplicationHistoryWriter applicationHistoryWriter) {
+    this.rmApplicationHistoryWriter = applicationHistoryWriter;
+  }
+
+  public void setSystemMetricsPublisher(
+      SystemMetricsPublisher metricsPublisher) {
+    this.systemMetricsPublisher = metricsPublisher;
+  }
+
+  public SystemMetricsPublisher getSystemMetricsPublisher() {
+    return this.systemMetricsPublisher;
+  }
+
+  public Configuration getYarnConfiguration() {
+    return this.yarnConfiguration;
+  }
+
+  public void setYarnConfiguration(Configuration yarnConfiguration) {
+    this.yarnConfiguration = yarnConfiguration;
+  }
+
+  public RMTimelineCollectorManager getRMTimelineCollectorManager() {
+    return timelineCollectorManager;
+  }
+
+  public void setRMTimelineCollectorManager(
+      RMTimelineCollectorManager collectorManager) {
+    this.timelineCollectorManager = collectorManager;
+  }
+
+  public String getHAZookeeperConnectionState() {
+    if (elector == null) {
+      return "Could not find leader elector. Verify both HA and automatic "
+          + "failover are enabled.";
+    } else {
+      return elector.getZookeeperConnectionState();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index f727f55..b63b60d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -115,7 +115,6 @@ import org.eclipse.jetty.webapp.WebAppContext;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
@@ -345,9 +344,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
             YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
     if (curatorEnabled) {
       this.curator = createAndStartCurator(conf);
-      elector = new CuratorBasedElectorService(rmContext, this);
+      elector = new CuratorBasedElectorService(this);
     } else {
-      elector = new ActiveStandbyElectorBasedElectorService(rmContext);
+      elector = new ActiveStandbyElectorBasedElectorService(this);
     }
     return elector;
   }
@@ -497,7 +496,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   private RMTimelineCollectorManager createRMTimelineCollectorManager() {
-    return new RMTimelineCollectorManager(rmContext);
+    return new RMTimelineCollectorManager(this);
   }
 
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
@@ -508,7 +507,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
         // we're dealing with the v.2.x publisher
         LOG.info("system metrics publisher with the timeline service V2 is " +
             "configured");
-        publisher = new TimelineServiceV2Publisher(rmContext);
+        publisher = new TimelineServiceV2Publisher(
+            rmContext.getRMTimelineCollectorManager());
       } else {
         // we're dealing with the v.1.x publisher
         LOG.info("system metrics publisher with the timeline service V1 is " +
@@ -560,7 +560,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
     private ApplicationMasterLauncher applicationMasterLauncher;
     private ContainerAllocationExpirer containerAllocationExpirer;
     private ResourceManager rm;
-    private RMActiveServiceContext activeServiceContext;
     private boolean fromActive = false;
     private StandByTransitionRunnable standByTransitionRunnable;
 
@@ -573,9 +572,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
     protected void serviceInit(Configuration configuration) throws Exception {
       standByTransitionRunnable = new StandByTransitionRunnable();
 
-      activeServiceContext = new RMActiveServiceContext();
-      rmContext.setActiveServiceContext(activeServiceContext);
-
       rmSecretManagerService = createRMSecretManagerService();
       addService(rmSecretManagerService);
 
@@ -1149,7 +1145,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
     ClusterMetrics.destroy();
     QueueMetrics.clearQueueMetrics();
     if (initialize) {
-      resetDispatcher();
+      resetRMContext();
       createAndInitActiveServices(true);
     }
   }
@@ -1294,7 +1290,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected AdminService createAdminService() {
-    return new AdminService(this, rmContext);
+    return new AdminService(this);
   }
 
   protected RMSecretManagerService createRMSecretManagerService() {
@@ -1417,17 +1413,24 @@ public class ResourceManager extends CompositeService implements Recoverable {
     return dispatcher;
   }
 
-  private void resetDispatcher() {
+  private void resetRMContext() {
+    RMContextImpl rmContextImpl = new RMContextImpl();
+    // transfer service context to new RM service Context
+    rmContextImpl.setServiceContext(rmContext.getServiceContext());
+
+    // reset dispatcher
     Dispatcher dispatcher = setupDispatcher();
-    ((Service)dispatcher).init(this.conf);
-    ((Service)dispatcher).start();
-    removeService((Service)rmDispatcher);
+    ((Service) dispatcher).init(this.conf);
+    ((Service) dispatcher).start();
+    removeService((Service) rmDispatcher);
     // Need to stop previous rmDispatcher before assigning new dispatcher
     // otherwise causes "AsyncDispatcher event handler" thread leak
     ((Service) rmDispatcher).stop();
     rmDispatcher = dispatcher;
     addIfService(rmDispatcher);
-    rmContext.setDispatcher(rmDispatcher);
+    rmContextImpl.setDispatcher(dispatcher);
+
+    rmContext = rmContextImpl;
   }
 
   private void setSchedulerRecoveryStartAndWaitTime(RMState state,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
index a8bf6bd..a3a2ebc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -75,9 +74,10 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
   private RMTimelineCollectorManager rmTimelineCollectorManager;
   private boolean publishContainerEvents;
 
-  public TimelineServiceV2Publisher(RMContext rmContext) {
+  public TimelineServiceV2Publisher(
+      RMTimelineCollectorManager timelineCollectorManager) {
     super("TimelineserviceV2Publisher");
-    rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
+    rmTimelineCollectorManager = timelineCollectorManager;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
index 64c3749..c980458 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
@@ -41,16 +41,16 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager {
   private static final Log LOG =
       LogFactory.getLog(RMTimelineCollectorManager.class);
 
-  private RMContext rmContext;
+  private ResourceManager rm;
 
-  public RMTimelineCollectorManager(RMContext rmContext) {
+  public RMTimelineCollectorManager(ResourceManager resourceManager) {
     super(RMTimelineCollectorManager.class.getName());
-    this.rmContext = rmContext;
+    this.rm = resourceManager;
   }
 
   @Override
   protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
-    RMApp app = rmContext.getRMApps().get(appId);
+    RMApp app = rm.getRMContext().getRMApps().get(appId);
     if (app == null) {
       throw new YarnRuntimeException(
           "Unable to get the timeline collector context info for a " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 23009db..5a215e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -1055,7 +1055,7 @@ public class MockRM extends ResourceManager {
 
   @Override
   protected AdminService createAdminService() {
-    return new AdminService(this, getRMContext()) {
+    return new AdminService(this) {
       @Override
       protected void startServer() {
         // override to not start rpc handler

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
index 1fe9bbe..140483a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
@@ -122,13 +122,15 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
       throws IOException, InterruptedException {
     AdminService as = mock(AdminService.class);
     RMContext rc = mock(RMContext.class);
+    ResourceManager rm = mock(ResourceManager.class);
     Configuration myConf = new Configuration(conf);
 
     myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
+    when(rm.getRMContext()).thenReturn(rc);
     when(rc.getRMAdminService()).thenReturn(as);
 
-    ActiveStandbyElectorBasedElectorService
-        ees = new ActiveStandbyElectorBasedElectorService(rc);
+    ActiveStandbyElectorBasedElectorService ees =
+        new ActiveStandbyElectorBasedElectorService(rm);
     ees.init(myConf);
 
     ees.enterNeutralMode();
@@ -290,7 +292,7 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
 
     @Override
     protected EmbeddedElector createEmbeddedElector() {
-      return new ActiveStandbyElectorBasedElectorService(getRMContext()) {
+      return new ActiveStandbyElectorBasedElectorService(this) {
         @Override
         public void becomeActive() throws
             ServiceFailedException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index f807217..ec6b1e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -71,6 +71,7 @@ public class TestRMHA {
   private Log LOG = LogFactory.getLog(TestRMHA.class);
   private Configuration configuration;
   private MockRM rm = null;
+  private MockNM nm = null;
   private RMApp app = null;
   private RMAppAttempt attempt = null;
   private static final String STATE_ERR =
@@ -135,7 +136,7 @@ public class TestRMHA {
 
     try {
       rm.getNewAppId();
-      rm.registerNode("127.0.0.1:1", 2048);
+      nm = rm.registerNode("127.0.0.1:1", 2048);
       app = rm.submitApp(1024);
       attempt = app.getCurrentAppAttempt();
       rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
@@ -551,6 +552,17 @@ public class TestRMHA {
     verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
     assertEquals(1, rm.getRMContext().getRMNodes().size());
     assertEquals(1, rm.getRMContext().getRMApps().size());
+    Assert.assertNotNull("Node not registered", nm);
+
+    rm.adminService.transitionToStandby(requestInfo);
+    checkMonitorHealth();
+    checkStandbyRMFunctionality();
+    // race condition causes to register/node heartbeat node even after service
+    // is stopping/stopped. New RMContext is being created on every transition
+    // to standby, so metrics should be 0 which indicates new context reference
+    // has taken.
+    nm.registerNode();
+    verifyClusterMetrics(0, 0, 0, 0, 0, 0);
 
     // 3. Create new RM
     rm = new MockRM(conf, memStore) {
@@ -592,7 +604,7 @@ public class TestRMHA {
     rm = new MockRM(configuration) {
       @Override
       protected AdminService createAdminService() {
-        return new AdminService(this, getRMContext()) {
+        return new AdminService(this) {
           int counter = 0;
           @Override
           protected void setConfig(Configuration conf) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
index 2d40c91..ec09945 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -98,10 +99,12 @@ public class TestSystemMetricsPublisherForV2 {
           new Path(testRootDir.getAbsolutePath()), true);
     }
 
+    ResourceManager rm = mock(ResourceManager.class);
     RMContext rmContext = mock(RMContext.class);
     rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
     when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
-    rmTimelineCollectorManager = new RMTimelineCollectorManager(rmContext);
+    when(rm.getRMContext()).thenReturn(rmContext);
+    rmTimelineCollectorManager = new RMTimelineCollectorManager(rm);
     when(rmContext.getRMTimelineCollectorManager()).thenReturn(
         rmTimelineCollectorManager);
 
@@ -113,7 +116,8 @@ public class TestSystemMetricsPublisherForV2 {
 
     dispatcher.init(conf);
     dispatcher.start();
-    metricsPublisher = new TimelineServiceV2Publisher(rmContext) {
+    metricsPublisher =
+        new TimelineServiceV2Publisher(rmTimelineCollectorManager) {
       @Override
       protected Dispatcher getDispatcher() {
         return dispatcher;
@@ -162,7 +166,7 @@ public class TestSystemMetricsPublisherForV2 {
   public void testSystemMetricPublisherInitialization() {
     @SuppressWarnings("resource")
     TimelineServiceV2Publisher publisher =
-        new TimelineServiceV2Publisher(mock(RMContext.class));
+        new TimelineServiceV2Publisher(mock(RMTimelineCollectorManager.class));
     try {
       Configuration conf = getTimelineV2Conf();
       conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED,
@@ -174,7 +178,8 @@ public class TestSystemMetricsPublisherForV2 {
 
       publisher.stop();
 
-      publisher = new TimelineServiceV2Publisher(mock(RMContext.class));
+      publisher = new TimelineServiceV2Publisher(
+          mock(RMTimelineCollectorManager.class));
       conf = getTimelineV2Conf();
       publisher.init(conf);
       assertTrue("Expected to have registered event handlers and set ready to "


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message