hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject hadoop git commit: YARN-2865. Fixed RM to always create a new RMContext when transtions from StandBy to Active. Contributed by Rohith Sharmaks
Date Thu, 20 Nov 2014 03:49:27 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 765aecb4e -> 9cb8b75ba


YARN-2865. Fixed RM to always create a new RMContext when transtions from StandBy to Active.
Contributed by Rohith Sharmaks


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9cb8b75b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9cb8b75b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9cb8b75b

Branch: refs/heads/trunk
Commit: 9cb8b75ba57f18639492bfa3b7e7c11c00bb3d3b
Parents: 765aecb
Author: Jian He <jianhe@apache.org>
Authored: Wed Nov 19 19:48:33 2014 -0800
Committer: Jian He <jianhe@apache.org>
Committed: Wed Nov 19 19:48:52 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../resourcemanager/RMActiveServiceContext.java | 455 +++++++++++++++++++
 .../server/resourcemanager/RMContextImpl.java   | 235 ++++------
 .../server/resourcemanager/ResourceManager.java |  26 +-
 .../yarn/server/resourcemanager/TestRMHA.java   |  62 +++
 5 files changed, 626 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb8b75b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7f61601..683f38c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -115,6 +115,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2878. Fix DockerContainerExecutor.apt.vm formatting. (Abin Shahab via
     jianhe)
 
+    YARN-2865. Fixed RM to always create a new RMContext when transtions from
+    StandBy to Active. (Rohith Sharmaks via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb8b75b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
new file mode 100644
index 0000000..3bc2e9b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+/**
+ * The RMActiveServiceContext is the class that maintains all the
+ * RMActiveService contexts.This is expected to be used only by ResourceManager
+ * and RMContext.
+ */
+@Private
+@Unstable
+public class RMActiveServiceContext {
+
+  private static final Log LOG = LogFactory
+      .getLog(RMActiveServiceContext.class);
+
+  private final ConcurrentMap<ApplicationId, RMApp> applications =
+      new ConcurrentHashMap<ApplicationId, RMApp>();
+
+  private final ConcurrentMap<NodeId, RMNode> nodes =
+      new ConcurrentHashMap<NodeId, RMNode>();
+
+  private final ConcurrentMap<String, RMNode> inactiveNodes =
+      new ConcurrentHashMap<String, RMNode>();
+
+  private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>();
+
+  private boolean isWorkPreservingRecoveryEnabled;
+
+  private AMLivelinessMonitor amLivelinessMonitor;
+  private AMLivelinessMonitor amFinishingMonitor;
+  private RMStateStore stateStore = null;
+  private ContainerAllocationExpirer containerAllocationExpirer;
+  private DelegationTokenRenewer delegationTokenRenewer;
+  private AMRMTokenSecretManager amRMTokenSecretManager;
+  private RMContainerTokenSecretManager containerTokenSecretManager;
+  private NMTokenSecretManagerInRM nmTokenSecretManager;
+  private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
+  private ClientRMService clientRMService;
+  private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
+  private ResourceScheduler scheduler;
+  private ReservationSystem reservationSystem;
+  private NodesListManager nodesListManager;
+  private ResourceTrackerService resourceTrackerService;
+  private ApplicationMasterService applicationMasterService;
+  private RMApplicationHistoryWriter rmApplicationHistoryWriter;
+  private SystemMetricsPublisher systemMetricsPublisher;
+  private RMNodeLabelsManager nodeLabelManager;
+  private long epoch;
+  private Clock systemClock = new SystemClock();
+  private long schedulerRecoveryStartTime = 0;
+  private long schedulerRecoveryWaitTime = 0;
+  private boolean printLog = true;
+  private boolean isSchedulerReady = false;
+
+  public RMActiveServiceContext() {
+
+  }
+
+  @Private
+  @Unstable
+  public RMActiveServiceContext(Dispatcher rmDispatcher,
+      ContainerAllocationExpirer containerAllocationExpirer,
+      AMLivelinessMonitor amLivelinessMonitor,
+      AMLivelinessMonitor amFinishingMonitor,
+      DelegationTokenRenewer delegationTokenRenewer,
+      AMRMTokenSecretManager appTokenSecretManager,
+      RMContainerTokenSecretManager containerTokenSecretManager,
+      NMTokenSecretManagerInRM nmTokenSecretManager,
+      ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
+      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+    this();
+    this.setContainerAllocationExpirer(containerAllocationExpirer);
+    this.setAMLivelinessMonitor(amLivelinessMonitor);
+    this.setAMFinishingMonitor(amFinishingMonitor);
+    this.setDelegationTokenRenewer(delegationTokenRenewer);
+    this.setAMRMTokenSecretManager(appTokenSecretManager);
+    this.setContainerTokenSecretManager(containerTokenSecretManager);
+    this.setNMTokenSecretManager(nmTokenSecretManager);
+    this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
+    this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+
+    RMStateStore nullStore = new NullRMStateStore();
+    nullStore.setRMDispatcher(rmDispatcher);
+    try {
+      nullStore.init(new YarnConfiguration());
+      setStateStore(nullStore);
+    } catch (Exception e) {
+      assert false;
+    }
+  }
+
+  @Private
+  @Unstable
+  public void setStateStore(RMStateStore store) {
+    stateStore = store;
+  }
+
+  @Private
+  @Unstable
+  public ClientRMService getClientRMService() {
+    return clientRMService;
+  }
+
+  @Private
+  @Unstable
+  public ApplicationMasterService getApplicationMasterService() {
+    return applicationMasterService;
+  }
+
+  @Private
+  @Unstable
+  public ResourceTrackerService getResourceTrackerService() {
+    return resourceTrackerService;
+  }
+
+  @Private
+  @Unstable
+  public RMStateStore getStateStore() {
+    return stateStore;
+  }
+
+  @Private
+  @Unstable
+  public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
+    return this.applications;
+  }
+
+  @Private
+  @Unstable
+  public ConcurrentMap<NodeId, RMNode> getRMNodes() {
+    return this.nodes;
+  }
+
+  @Private
+  @Unstable
+  public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
+    return this.inactiveNodes;
+  }
+
+  @Private
+  @Unstable
+  public ContainerAllocationExpirer getContainerAllocationExpirer() {
+    return this.containerAllocationExpirer;
+  }
+
+  @Private
+  @Unstable
+  public AMLivelinessMonitor getAMLivelinessMonitor() {
+    return this.amLivelinessMonitor;
+  }
+
+  @Private
+  @Unstable
+  public AMLivelinessMonitor getAMFinishingMonitor() {
+    return this.amFinishingMonitor;
+  }
+
+  @Private
+  @Unstable
+  public DelegationTokenRenewer getDelegationTokenRenewer() {
+    return delegationTokenRenewer;
+  }
+
+  @Private
+  @Unstable
+  public AMRMTokenSecretManager getAMRMTokenSecretManager() {
+    return this.amRMTokenSecretManager;
+  }
+
+  @Private
+  @Unstable
+  public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+    return this.containerTokenSecretManager;
+  }
+
+  @Private
+  @Unstable
+  public NMTokenSecretManagerInRM getNMTokenSecretManager() {
+    return this.nmTokenSecretManager;
+  }
+
+  @Private
+  @Unstable
+  public ResourceScheduler getScheduler() {
+    return this.scheduler;
+  }
+
+  @Private
+  @Unstable
+  public ReservationSystem getReservationSystem() {
+    return this.reservationSystem;
+  }
+
+  @Private
+  @Unstable
+  public NodesListManager getNodesListManager() {
+    return this.nodesListManager;
+  }
+
+  @Private
+  @Unstable
+  public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
+    return this.clientToAMTokenSecretManager;
+  }
+
+  @Private
+  @Unstable
+  public void setClientRMService(ClientRMService clientRMService) {
+    this.clientRMService = clientRMService;
+  }
+
+  @Private
+  @Unstable
+  public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
+    return this.rmDelegationTokenSecretManager;
+  }
+
+  @Private
+  @Unstable
+  public void setRMDelegationTokenSecretManager(
+      RMDelegationTokenSecretManager delegationTokenSecretManager) {
+    this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
+  }
+
+  @Private
+  @Unstable
+  void setContainerAllocationExpirer(
+      ContainerAllocationExpirer containerAllocationExpirer) {
+    this.containerAllocationExpirer = containerAllocationExpirer;
+  }
+
+  @Private
+  @Unstable
+  void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
+    this.amLivelinessMonitor = amLivelinessMonitor;
+  }
+
+  @Private
+  @Unstable
+  void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
+    this.amFinishingMonitor = amFinishingMonitor;
+  }
+
+  @Private
+  @Unstable
+  void setContainerTokenSecretManager(
+      RMContainerTokenSecretManager containerTokenSecretManager) {
+    this.containerTokenSecretManager = containerTokenSecretManager;
+  }
+
+  @Private
+  @Unstable
+  void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
+    this.nmTokenSecretManager = nmTokenSecretManager;
+  }
+
+  @Private
+  @Unstable
+  void setScheduler(ResourceScheduler scheduler) {
+    this.scheduler = scheduler;
+  }
+
+  @Private
+  @Unstable
+  void setReservationSystem(ReservationSystem reservationSystem) {
+    this.reservationSystem = reservationSystem;
+  }
+
+  @Private
+  @Unstable
+  void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
+    this.delegationTokenRenewer = delegationTokenRenewer;
+  }
+
+  @Private
+  @Unstable
+  void setClientToAMTokenSecretManager(
+      ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
+    this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
+  }
+
+  @Private
+  @Unstable
+  void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
+    this.amRMTokenSecretManager = amRMTokenSecretManager;
+  }
+
+  @Private
+  @Unstable
+  void setNodesListManager(NodesListManager nodesListManager) {
+    this.nodesListManager = nodesListManager;
+  }
+
+  @Private
+  @Unstable
+  void setApplicationMasterService(
+      ApplicationMasterService applicationMasterService) {
+    this.applicationMasterService = applicationMasterService;
+  }
+
+  @Private
+  @Unstable
+  void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
+    this.resourceTrackerService = resourceTrackerService;
+  }
+
+  @Private
+  @Unstable
+  public void setWorkPreservingRecoveryEnabled(boolean enabled) {
+    this.isWorkPreservingRecoveryEnabled = enabled;
+  }
+
+  @Private
+  @Unstable
+  public boolean isWorkPreservingRecoveryEnabled() {
+    return this.isWorkPreservingRecoveryEnabled;
+  }
+
+  @Private
+  @Unstable
+  public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
+    return rmApplicationHistoryWriter;
+  }
+
+  @Private
+  @Unstable
+  public void setSystemMetricsPublisher(
+      SystemMetricsPublisher systemMetricsPublisher) {
+    this.systemMetricsPublisher = systemMetricsPublisher;
+  }
+
+  @Private
+  @Unstable
+  public SystemMetricsPublisher getSystemMetricsPublisher() {
+    return systemMetricsPublisher;
+  }
+
+  @Private
+  @Unstable
+  public void setRMApplicationHistoryWriter(
+      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+    this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
+  }
+
+  @Private
+  @Unstable
+  public long getEpoch() {
+    return this.epoch;
+  }
+
+  @Private
+  @Unstable
+  void setEpoch(long epoch) {
+    this.epoch = epoch;
+  }
+
+  @Private
+  @Unstable
+  public RMNodeLabelsManager getNodeLabelManager() {
+    return nodeLabelManager;
+  }
+
+  @Private
+  @Unstable
+  public void setNodeLabelManager(RMNodeLabelsManager mgr) {
+    nodeLabelManager = mgr;
+  }
+
+  @Private
+  @Unstable
+  public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
+    this.schedulerRecoveryStartTime = systemClock.getTime();
+    this.schedulerRecoveryWaitTime = waitTime;
+  }
+
+  @Private
+  @Unstable
+  public boolean isSchedulerReadyForAllocatingContainers() {
+    if (isSchedulerReady) {
+      return isSchedulerReady;
+    }
+    isSchedulerReady =
+        (systemClock.getTime() - schedulerRecoveryStartTime) > schedulerRecoveryWaitTime;
+    if (!isSchedulerReady && printLog) {
+      LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
+      printLog = false;
+    }
+    if (isSchedulerReady) {
+      LOG.info("Scheduler recovery is done. Start allocating new containers.");
+    }
+    return isSchedulerReady;
+  }
+
+  @Private
+  @Unstable
+  public void setSystemClock(Clock clock) {
+    this.systemClock = clock;
+  }
+
+  @Private
+  @Unstable
+  public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
+    return systemCredentials;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb8b75b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 7c1db3d..55d7667 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -19,24 +19,20 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.ConfigurationProvider;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -51,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -59,52 +54,16 @@ public class RMContextImpl implements RMContext {
 
   private Dispatcher rmDispatcher;
 
-  private final ConcurrentMap<ApplicationId, RMApp> applications
-    = new ConcurrentHashMap<ApplicationId, RMApp>();
-
-  private final ConcurrentMap<NodeId, RMNode> nodes
-    = new ConcurrentHashMap<NodeId, RMNode>();
-  
-  private final ConcurrentMap<String, RMNode> inactiveNodes
-    = new ConcurrentHashMap<String, RMNode>();
-
-  private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
-      new ConcurrentHashMap<ApplicationId, ByteBuffer>();
-
   private boolean isHAEnabled;
-  private boolean isWorkPreservingRecoveryEnabled;
+
   private HAServiceState haServiceState =
       HAServiceProtocol.HAServiceState.INITIALIZING;
-  
-  private AMLivelinessMonitor amLivelinessMonitor;
-  private AMLivelinessMonitor amFinishingMonitor;
-  private RMStateStore stateStore = null;
-  private ContainerAllocationExpirer containerAllocationExpirer;
-  private DelegationTokenRenewer delegationTokenRenewer;
-  private AMRMTokenSecretManager amRMTokenSecretManager;
-  private RMContainerTokenSecretManager containerTokenSecretManager;
-  private NMTokenSecretManagerInRM nmTokenSecretManager;
-  private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
+
   private AdminService adminService;
-  private ClientRMService clientRMService;
-  private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
-  private ResourceScheduler scheduler;
-  private ReservationSystem reservationSystem;
-  private NodesListManager nodesListManager;
-  private ResourceTrackerService resourceTrackerService;
-  private ApplicationMasterService applicationMasterService;
-  private RMApplicationHistoryWriter rmApplicationHistoryWriter;
-  private SystemMetricsPublisher systemMetricsPublisher;
+
   private ConfigurationProvider configurationProvider;
-  private RMNodeLabelsManager nodeLabelManager;
-  private long epoch;
-  private Clock systemClock = new SystemClock();
-  private long schedulerRecoveryStartTime = 0;
-  private long schedulerRecoveryWaitTime = 0;
-  private boolean printLog = true;
-  private boolean isSchedulerReady = false;
 
-  private static final Log LOG = LogFactory.getLog(RMContextImpl.class);
+  private RMActiveServiceContext activeServiceContext;
 
   /**
    * Default constructor. To be used in conjunction with setter methods for
@@ -128,24 +87,11 @@ public class RMContextImpl implements RMContext {
       RMApplicationHistoryWriter rmApplicationHistoryWriter) {
     this();
     this.setDispatcher(rmDispatcher);
-    this.setContainerAllocationExpirer(containerAllocationExpirer);
-    this.setAMLivelinessMonitor(amLivelinessMonitor);
-    this.setAMFinishingMonitor(amFinishingMonitor);
-    this.setDelegationTokenRenewer(delegationTokenRenewer);
-    this.setAMRMTokenSecretManager(appTokenSecretManager);
-    this.setContainerTokenSecretManager(containerTokenSecretManager);
-    this.setNMTokenSecretManager(nmTokenSecretManager);
-    this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
-    this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
-
-    RMStateStore nullStore = new NullRMStateStore();
-    nullStore.setRMDispatcher(rmDispatcher);
-    try {
-      nullStore.init(new YarnConfiguration());
-      setStateStore(nullStore);
-    } catch (Exception e) {
-      assert false;
-    }
+    setActiveServiceContext(new RMActiveServiceContext(rmDispatcher,
+        containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
+        delegationTokenRenewer, appTokenSecretManager,
+        containerTokenSecretManager, nmTokenSecretManager,
+        clientToAMTokenSecretManager, rmApplicationHistoryWriter));
 
     ConfigurationProvider provider = new LocalConfigurationProvider();
     setConfigurationProvider(provider);
@@ -155,80 +101,80 @@ public class RMContextImpl implements RMContext {
   public Dispatcher getDispatcher() {
     return this.rmDispatcher;
   }
-  
-  @Override 
+
+  @Override
   public RMStateStore getStateStore() {
-    return stateStore;
+    return activeServiceContext.getStateStore();
   }
 
   @Override
   public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
-    return this.applications;
+    return activeServiceContext.getRMApps();
   }
 
   @Override
   public ConcurrentMap<NodeId, RMNode> getRMNodes() {
-    return this.nodes;
+    return activeServiceContext.getRMNodes();
   }
-  
+
   @Override
   public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
-    return this.inactiveNodes;
+    return activeServiceContext.getInactiveRMNodes();
   }
 
   @Override
   public ContainerAllocationExpirer getContainerAllocationExpirer() {
-    return this.containerAllocationExpirer;
+    return activeServiceContext.getContainerAllocationExpirer();
   }
 
   @Override
   public AMLivelinessMonitor getAMLivelinessMonitor() {
-    return this.amLivelinessMonitor;
+    return activeServiceContext.getAMLivelinessMonitor();
   }
 
   @Override
   public AMLivelinessMonitor getAMFinishingMonitor() {
-    return this.amFinishingMonitor;
+    return activeServiceContext.getAMFinishingMonitor();
   }
 
   @Override
   public DelegationTokenRenewer getDelegationTokenRenewer() {
-    return delegationTokenRenewer;
+    return activeServiceContext.getDelegationTokenRenewer();
   }
 
   @Override
   public AMRMTokenSecretManager getAMRMTokenSecretManager() {
-    return this.amRMTokenSecretManager;
+    return activeServiceContext.getAMRMTokenSecretManager();
   }
 
   @Override
   public RMContainerTokenSecretManager getContainerTokenSecretManager() {
-    return this.containerTokenSecretManager;
+    return activeServiceContext.getContainerTokenSecretManager();
   }
-  
+
   @Override
   public NMTokenSecretManagerInRM getNMTokenSecretManager() {
-    return this.nmTokenSecretManager;
+    return activeServiceContext.getNMTokenSecretManager();
   }
 
   @Override
   public ResourceScheduler getScheduler() {
-    return this.scheduler;
+    return activeServiceContext.getScheduler();
   }
 
   @Override
   public ReservationSystem getReservationSystem() {
-    return this.reservationSystem;
+    return activeServiceContext.getReservationSystem();
   }
-  
+
   @Override
   public NodesListManager getNodesListManager() {
-    return this.nodesListManager;
+    return activeServiceContext.getNodesListManager();
   }
 
   @Override
   public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
-    return this.clientToAMTokenSecretManager;
+    return activeServiceContext.getClientToAMTokenSecretManager();
   }
 
   @Override
@@ -238,22 +184,22 @@ public class RMContextImpl implements RMContext {
 
   @VisibleForTesting
   public void setStateStore(RMStateStore store) {
-    stateStore = store;
+    activeServiceContext.setStateStore(store);
   }
-  
+
   @Override
   public ClientRMService getClientRMService() {
-    return this.clientRMService;
+    return activeServiceContext.getClientRMService();
   }
 
   @Override
   public ApplicationMasterService getApplicationMasterService() {
-    return applicationMasterService;
+    return activeServiceContext.getApplicationMasterService();
   }
 
   @Override
   public ResourceTrackerService getResourceTrackerService() {
-    return resourceTrackerService;
+    return activeServiceContext.getResourceTrackerService();
   }
 
   void setHAEnabled(boolean isHAEnabled) {
@@ -276,78 +222,78 @@ public class RMContextImpl implements RMContext {
 
   @Override
   public void setClientRMService(ClientRMService clientRMService) {
-    this.clientRMService = clientRMService;
+    activeServiceContext.setClientRMService(clientRMService);
   }
-  
+
   @Override
   public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
-    return this.rmDelegationTokenSecretManager;
+    return activeServiceContext.getRMDelegationTokenSecretManager();
   }
-  
+
   @Override
   public void setRMDelegationTokenSecretManager(
       RMDelegationTokenSecretManager delegationTokenSecretManager) {
-    this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
+    activeServiceContext
+        .setRMDelegationTokenSecretManager(delegationTokenSecretManager);
   }
 
   void setContainerAllocationExpirer(
       ContainerAllocationExpirer containerAllocationExpirer) {
-    this.containerAllocationExpirer = containerAllocationExpirer;
+    activeServiceContext
+        .setContainerAllocationExpirer(containerAllocationExpirer);
   }
 
   void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
-    this.amLivelinessMonitor = amLivelinessMonitor;
+    activeServiceContext.setAMLivelinessMonitor(amLivelinessMonitor);
   }
 
   void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
-    this.amFinishingMonitor = amFinishingMonitor;
+    activeServiceContext.setAMFinishingMonitor(amFinishingMonitor);
   }
 
   void setContainerTokenSecretManager(
       RMContainerTokenSecretManager containerTokenSecretManager) {
-    this.containerTokenSecretManager = containerTokenSecretManager;
+    activeServiceContext
+        .setContainerTokenSecretManager(containerTokenSecretManager);
   }
 
-  void setNMTokenSecretManager(
-      NMTokenSecretManagerInRM nmTokenSecretManager) {
-    this.nmTokenSecretManager = nmTokenSecretManager;
+  void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
+    activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager);
   }
 
   void setScheduler(ResourceScheduler scheduler) {
-    this.scheduler = scheduler;
+    activeServiceContext.setScheduler(scheduler);
   }
-  
+
   void setReservationSystem(ReservationSystem reservationSystem) {
-    this.reservationSystem = reservationSystem;
+    activeServiceContext.setReservationSystem(reservationSystem);
   }
 
-  void setDelegationTokenRenewer(
-      DelegationTokenRenewer delegationTokenRenewer) {
-    this.delegationTokenRenewer = delegationTokenRenewer;
+  void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
+    activeServiceContext.setDelegationTokenRenewer(delegationTokenRenewer);
   }
 
   void setClientToAMTokenSecretManager(
       ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
-    this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
+    activeServiceContext
+        .setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
   }
 
-  void setAMRMTokenSecretManager(
-      AMRMTokenSecretManager amRMTokenSecretManager) {
-    this.amRMTokenSecretManager = amRMTokenSecretManager;
+  void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
+    activeServiceContext.setAMRMTokenSecretManager(amRMTokenSecretManager);
   }
 
   void setNodesListManager(NodesListManager nodesListManager) {
-    this.nodesListManager = nodesListManager;
+    activeServiceContext.setNodesListManager(nodesListManager);
   }
 
   void setApplicationMasterService(
       ApplicationMasterService applicationMasterService) {
-    this.applicationMasterService = applicationMasterService;
+    activeServiceContext.setApplicationMasterService(applicationMasterService);
   }
 
-  void setResourceTrackerService(
-      ResourceTrackerService resourceTrackerService) {
-    this.resourceTrackerService = resourceTrackerService;
+  void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
+    activeServiceContext.setResourceTrackerService(resourceTrackerService);
   }
 
   @Override
@@ -363,34 +309,35 @@ public class RMContextImpl implements RMContext {
   }
 
   public void setWorkPreservingRecoveryEnabled(boolean enabled) {
-    this.isWorkPreservingRecoveryEnabled = enabled;
+    activeServiceContext.setWorkPreservingRecoveryEnabled(enabled);
   }
 
   @Override
   public boolean isWorkPreservingRecoveryEnabled() {
-    return this.isWorkPreservingRecoveryEnabled;
+    return activeServiceContext.isWorkPreservingRecoveryEnabled();
   }
 
   @Override
   public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
-    return rmApplicationHistoryWriter;
+    return activeServiceContext.getRMApplicationHistoryWriter();
   }
 
   @Override
   public void setSystemMetricsPublisher(
       SystemMetricsPublisher systemMetricsPublisher) {
-    this.systemMetricsPublisher = systemMetricsPublisher;
+    activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher);
   }
 
   @Override
   public SystemMetricsPublisher getSystemMetricsPublisher() {
-    return systemMetricsPublisher;
+    return activeServiceContext.getSystemMetricsPublisher();
   }
 
   @Override
   public void setRMApplicationHistoryWriter(
       RMApplicationHistoryWriter rmApplicationHistoryWriter) {
-    this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
+    activeServiceContext
+        .setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
   }
 
   @Override
@@ -405,51 +352,51 @@ public class RMContextImpl implements RMContext {
 
   @Override
   public long getEpoch() {
-    return this.epoch;
+    return activeServiceContext.getEpoch();
   }
 
   void setEpoch(long epoch) {
-    this.epoch = epoch;
+    activeServiceContext.setEpoch(epoch);
   }
 
   @Override
   public RMNodeLabelsManager getNodeLabelManager() {
-    return nodeLabelManager;
+    return activeServiceContext.getNodeLabelManager();
   }
-  
+
   @Override
   public void setNodeLabelManager(RMNodeLabelsManager mgr) {
-    nodeLabelManager = mgr;
+    activeServiceContext.setNodeLabelManager(mgr);
   }
 
   public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
-    this.schedulerRecoveryStartTime = systemClock.getTime();
-    this.schedulerRecoveryWaitTime = waitTime;
+    activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
   }
 
   public boolean isSchedulerReadyForAllocatingContainers() {
-    if (isSchedulerReady) {
-      return isSchedulerReady;
-    }
-    isSchedulerReady = (systemClock.getTime() - schedulerRecoveryStartTime)
-        > schedulerRecoveryWaitTime;
-    if (!isSchedulerReady && printLog) {
-      LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
-      printLog = false;
-    }
-    if (isSchedulerReady) {
-      LOG.info("Scheduler recovery is done. Start allocating new containers.");
-    }
-    return isSchedulerReady;
+    return activeServiceContext.isSchedulerReadyForAllocatingContainers();
   }
 
   @Private
   @VisibleForTesting
   public void setSystemClock(Clock clock) {
-    this.systemClock = clock;
+    activeServiceContext.setSystemClock(clock);
   }
 
   public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
-    return systemCredentials;
+    return activeServiceContext.getSystemCredentialsForApps();
   }
+
+  @Private
+  @Unstable
+  public RMActiveServiceContext getActiveServiceContext() {
+    return activeServiceContext;
+  }
+
+  @Private
+  @Unstable
+  void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
+    this.activeServiceContext = activeServiceContext;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb8b75b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index e0840b6..3ce42a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -400,6 +400,7 @@ public class ResourceManager extends CompositeService implements Recoverable
{
     private ContainerAllocationExpirer containerAllocationExpirer;
     private ResourceManager rm;
     private boolean recoveryEnabled;
+    private RMActiveServiceContext activeServiceContext;
 
     RMActiveServices(ResourceManager rm) {
       super("RMActiveServices");
@@ -408,6 +409,9 @@ public class ResourceManager extends CompositeService implements Recoverable
{
 
     @Override
     protected void serviceInit(Configuration configuration) throws Exception {
+      activeServiceContext = new RMActiveServiceContext();
+      rmContext.setActiveServiceContext(activeServiceContext);
+
       conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
       rmSecretManagerService = createRMSecretManagerService();
@@ -1008,11 +1012,15 @@ public class ResourceManager extends CompositeService implements Recoverable
{
     if (activeServices != null) {
       activeServices.stop();
       activeServices = null;
-      rmContext.getRMNodes().clear();
-      rmContext.getInactiveRMNodes().clear();
-      rmContext.getRMApps().clear();
-      ClusterMetrics.destroy();
-      QueueMetrics.clearQueueMetrics();
+    }
+  }
+
+  void reinitialize(boolean initialize) throws Exception {
+    ClusterMetrics.destroy();
+    QueueMetrics.clearQueueMetrics();
+    if (initialize) {
+      resetDispatcher();
+      createAndInitActiveServices();
     }
   }
 
@@ -1036,8 +1044,7 @@ public class ResourceManager extends CompositeService implements Recoverable
{
           startActiveServices();
           return null;
         } catch (Exception e) {
-          resetDispatcher();
-          createAndInitActiveServices();
+          reinitialize(true);
           throw e;
         }
       }
@@ -1059,10 +1066,7 @@ public class ResourceManager extends CompositeService implements Recoverable
{
     if (rmContext.getHAServiceState() ==
         HAServiceProtocol.HAServiceState.ACTIVE) {
       stopActiveServices();
-      if (initialize) {
-        resetDispatcher();
-        createAndInitActiveServices();
-      }
+      reinitialize(initialize);
     }
     rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
     LOG.info("Transitioned to standby state");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb8b75b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index c6d7d09..122eb60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -513,6 +513,68 @@ public class TestRMHA {
     rm.stop();
   }
 
+  @Test
+  public void testFailoverClearsRMContext() throws Exception {
+    configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+    configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    Configuration conf = new YarnConfiguration(configuration);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // 1. start RM
+    rm = new MockRM(conf, memStore);
+    rm.init(conf);
+    rm.start();
+
+    StateChangeRequestInfo requestInfo =
+        new StateChangeRequestInfo(
+            HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+    checkMonitorHealth();
+    checkStandbyRMFunctionality();
+
+    // 2. Transition to active
+    rm.adminService.transitionToActive(requestInfo);
+    checkMonitorHealth();
+    checkActiveRMFunctionality();
+    verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
+    assertEquals(1, rm.getRMContext().getRMNodes().size());
+    assertEquals(1, rm.getRMContext().getRMApps().size());
+
+    // 3. Create new RM
+    rm = new MockRM(conf, memStore) {
+      @Override
+      protected ResourceTrackerService createResourceTrackerService() {
+        return new ResourceTrackerService(this.rmContext,
+            this.nodesListManager, this.nmLivelinessMonitor,
+            this.rmContext.getContainerTokenSecretManager(),
+            this.rmContext.getNMTokenSecretManager()) {
+          @Override
+          protected void serviceStart() throws Exception {
+            throw new Exception("ResourceTracker service failed");
+          }
+        };
+      }
+    };
+    rm.init(conf);
+    rm.start();
+    checkMonitorHealth();
+    checkStandbyRMFunctionality();
+
+    // 4. Try Transition to active, throw exception
+    try {
+      rm.adminService.transitionToActive(requestInfo);
+      Assert.fail("Transitioned to Active should throw exception.");
+    } catch (Exception e) {
+      assertTrue("Error when transitioning to Active mode".contains(e
+          .getMessage()));
+    }
+    // 5. Clears the metrics
+    verifyClusterMetrics(0, 0, 0, 0, 0, 0);
+    assertEquals(0, rm.getRMContext().getRMNodes().size());
+    assertEquals(0, rm.getRMContext().getRMApps().size());
+  }
+
   public void innerTestHAWithRMHostName(boolean includeBindHost) {
     //this is run two times, with and without a bind host configured
     if (includeBindHost) {


Mime
View raw message