Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 09A9210DE1 for ; Thu, 20 Nov 2014 03:49:28 +0000 (UTC) Received: (qmail 48798 invoked by uid 500); 20 Nov 2014 03:49:27 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 48724 invoked by uid 500); 20 Nov 2014 03:49:27 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 48715 invoked by uid 99); 20 Nov 2014 03:49:27 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Nov 2014 03:49:27 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 46D268A35CE; Thu, 20 Nov 2014 03:49:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-2865. Fixed RM to always create a new RMContext when transtions from StandBy to Active. Contributed by Rohith Sharmaks Date: Thu, 20 Nov 2014 03:49:27 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk 765aecb4e -> 9cb8b75ba YARN-2865. Fixed RM to always create a new RMContext when transtions from StandBy to Active. Contributed by Rohith Sharmaks Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9cb8b75b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9cb8b75b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9cb8b75b Branch: refs/heads/trunk Commit: 9cb8b75ba57f18639492bfa3b7e7c11c00bb3d3b Parents: 765aecb Author: Jian He Authored: Wed Nov 19 19:48:33 2014 -0800 Committer: Jian He Committed: Wed Nov 19 19:48:52 2014 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/RMActiveServiceContext.java | 455 +++++++++++++++++++ .../server/resourcemanager/RMContextImpl.java | 235 ++++------ .../server/resourcemanager/ResourceManager.java | 26 +- .../yarn/server/resourcemanager/TestRMHA.java | 62 +++ 5 files changed, 626 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb8b75b/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7f61601..683f38c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -115,6 +115,9 @@ Release 2.7.0 - UNRELEASED YARN-2878. Fix DockerContainerExecutor.apt.vm formatting. (Abin Shahab via jianhe) + YARN-2865. Fixed RM to always create a new RMContext when transtions from + StandBy to Active. (Rohith Sharmaks via jianhe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb8b75b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java new file mode 100644 index 0000000..3bc2e9b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -0,0 +1,455 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; + +/** + * The RMActiveServiceContext is the class that maintains all the + * RMActiveService contexts.This is expected to be used only by ResourceManager + * and RMContext. + */ +@Private +@Unstable +public class RMActiveServiceContext { + + private static final Log LOG = LogFactory + .getLog(RMActiveServiceContext.class); + + private final ConcurrentMap applications = + new ConcurrentHashMap(); + + private final ConcurrentMap nodes = + new ConcurrentHashMap(); + + private final ConcurrentMap inactiveNodes = + new ConcurrentHashMap(); + + private final ConcurrentMap systemCredentials = + new ConcurrentHashMap(); + + private boolean isWorkPreservingRecoveryEnabled; + + private AMLivelinessMonitor amLivelinessMonitor; + private AMLivelinessMonitor amFinishingMonitor; + private RMStateStore stateStore = null; + private ContainerAllocationExpirer containerAllocationExpirer; + private DelegationTokenRenewer delegationTokenRenewer; + private AMRMTokenSecretManager amRMTokenSecretManager; + private RMContainerTokenSecretManager containerTokenSecretManager; + private NMTokenSecretManagerInRM nmTokenSecretManager; + private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; + private ClientRMService clientRMService; + private RMDelegationTokenSecretManager rmDelegationTokenSecretManager; + private ResourceScheduler scheduler; + private ReservationSystem reservationSystem; + private NodesListManager nodesListManager; + private ResourceTrackerService resourceTrackerService; + private ApplicationMasterService applicationMasterService; + private RMApplicationHistoryWriter rmApplicationHistoryWriter; + private SystemMetricsPublisher systemMetricsPublisher; + private RMNodeLabelsManager nodeLabelManager; + private long epoch; + private Clock systemClock = new SystemClock(); + private long schedulerRecoveryStartTime = 0; + private long schedulerRecoveryWaitTime = 0; + private boolean printLog = true; + private boolean isSchedulerReady = false; + + public RMActiveServiceContext() { + + } + + @Private + @Unstable + public RMActiveServiceContext(Dispatcher rmDispatcher, + ContainerAllocationExpirer containerAllocationExpirer, + AMLivelinessMonitor amLivelinessMonitor, + AMLivelinessMonitor amFinishingMonitor, + DelegationTokenRenewer delegationTokenRenewer, + AMRMTokenSecretManager appTokenSecretManager, + RMContainerTokenSecretManager containerTokenSecretManager, + NMTokenSecretManagerInRM nmTokenSecretManager, + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + this(); + this.setContainerAllocationExpirer(containerAllocationExpirer); + this.setAMLivelinessMonitor(amLivelinessMonitor); + this.setAMFinishingMonitor(amFinishingMonitor); + this.setDelegationTokenRenewer(delegationTokenRenewer); + this.setAMRMTokenSecretManager(appTokenSecretManager); + this.setContainerTokenSecretManager(containerTokenSecretManager); + this.setNMTokenSecretManager(nmTokenSecretManager); + this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); + this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + + RMStateStore nullStore = new NullRMStateStore(); + nullStore.setRMDispatcher(rmDispatcher); + try { + nullStore.init(new YarnConfiguration()); + setStateStore(nullStore); + } catch (Exception e) { + assert false; + } + } + + @Private + @Unstable + public void setStateStore(RMStateStore store) { + stateStore = store; + } + + @Private + @Unstable + public ClientRMService getClientRMService() { + return clientRMService; + } + + @Private + @Unstable + public ApplicationMasterService getApplicationMasterService() { + return applicationMasterService; + } + + @Private + @Unstable + public ResourceTrackerService getResourceTrackerService() { + return resourceTrackerService; + } + + @Private + @Unstable + public RMStateStore getStateStore() { + return stateStore; + } + + @Private + @Unstable + public ConcurrentMap getRMApps() { + return this.applications; + } + + @Private + @Unstable + public ConcurrentMap getRMNodes() { + return this.nodes; + } + + @Private + @Unstable + public ConcurrentMap getInactiveRMNodes() { + return this.inactiveNodes; + } + + @Private + @Unstable + public ContainerAllocationExpirer getContainerAllocationExpirer() { + return this.containerAllocationExpirer; + } + + @Private + @Unstable + public AMLivelinessMonitor getAMLivelinessMonitor() { + return this.amLivelinessMonitor; + } + + @Private + @Unstable + public AMLivelinessMonitor getAMFinishingMonitor() { + return this.amFinishingMonitor; + } + + @Private + @Unstable + public DelegationTokenRenewer getDelegationTokenRenewer() { + return delegationTokenRenewer; + } + + @Private + @Unstable + public AMRMTokenSecretManager getAMRMTokenSecretManager() { + return this.amRMTokenSecretManager; + } + + @Private + @Unstable + public RMContainerTokenSecretManager getContainerTokenSecretManager() { + return this.containerTokenSecretManager; + } + + @Private + @Unstable + public NMTokenSecretManagerInRM getNMTokenSecretManager() { + return this.nmTokenSecretManager; + } + + @Private + @Unstable + public ResourceScheduler getScheduler() { + return this.scheduler; + } + + @Private + @Unstable + public ReservationSystem getReservationSystem() { + return this.reservationSystem; + } + + @Private + @Unstable + public NodesListManager getNodesListManager() { + return this.nodesListManager; + } + + @Private + @Unstable + public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { + return this.clientToAMTokenSecretManager; + } + + @Private + @Unstable + public void setClientRMService(ClientRMService clientRMService) { + this.clientRMService = clientRMService; + } + + @Private + @Unstable + public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() { + return this.rmDelegationTokenSecretManager; + } + + @Private + @Unstable + public void setRMDelegationTokenSecretManager( + RMDelegationTokenSecretManager delegationTokenSecretManager) { + this.rmDelegationTokenSecretManager = delegationTokenSecretManager; + } + + @Private + @Unstable + void setContainerAllocationExpirer( + ContainerAllocationExpirer containerAllocationExpirer) { + this.containerAllocationExpirer = containerAllocationExpirer; + } + + @Private + @Unstable + void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) { + this.amLivelinessMonitor = amLivelinessMonitor; + } + + @Private + @Unstable + void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) { + this.amFinishingMonitor = amFinishingMonitor; + } + + @Private + @Unstable + void setContainerTokenSecretManager( + RMContainerTokenSecretManager containerTokenSecretManager) { + this.containerTokenSecretManager = containerTokenSecretManager; + } + + @Private + @Unstable + void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) { + this.nmTokenSecretManager = nmTokenSecretManager; + } + + @Private + @Unstable + void setScheduler(ResourceScheduler scheduler) { + this.scheduler = scheduler; + } + + @Private + @Unstable + void setReservationSystem(ReservationSystem reservationSystem) { + this.reservationSystem = reservationSystem; + } + + @Private + @Unstable + void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) { + this.delegationTokenRenewer = delegationTokenRenewer; + } + + @Private + @Unstable + void setClientToAMTokenSecretManager( + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { + this.clientToAMTokenSecretManager = clientToAMTokenSecretManager; + } + + @Private + @Unstable + void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) { + this.amRMTokenSecretManager = amRMTokenSecretManager; + } + + @Private + @Unstable + void setNodesListManager(NodesListManager nodesListManager) { + this.nodesListManager = nodesListManager; + } + + @Private + @Unstable + void setApplicationMasterService( + ApplicationMasterService applicationMasterService) { + this.applicationMasterService = applicationMasterService; + } + + @Private + @Unstable + void setResourceTrackerService(ResourceTrackerService resourceTrackerService) { + this.resourceTrackerService = resourceTrackerService; + } + + @Private + @Unstable + public void setWorkPreservingRecoveryEnabled(boolean enabled) { + this.isWorkPreservingRecoveryEnabled = enabled; + } + + @Private + @Unstable + public boolean isWorkPreservingRecoveryEnabled() { + return this.isWorkPreservingRecoveryEnabled; + } + + @Private + @Unstable + public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { + return rmApplicationHistoryWriter; + } + + @Private + @Unstable + public void setSystemMetricsPublisher( + SystemMetricsPublisher systemMetricsPublisher) { + this.systemMetricsPublisher = systemMetricsPublisher; + } + + @Private + @Unstable + public SystemMetricsPublisher getSystemMetricsPublisher() { + return systemMetricsPublisher; + } + + @Private + @Unstable + public void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + this.rmApplicationHistoryWriter = rmApplicationHistoryWriter; + } + + @Private + @Unstable + public long getEpoch() { + return this.epoch; + } + + @Private + @Unstable + void setEpoch(long epoch) { + this.epoch = epoch; + } + + @Private + @Unstable + public RMNodeLabelsManager getNodeLabelManager() { + return nodeLabelManager; + } + + @Private + @Unstable + public void setNodeLabelManager(RMNodeLabelsManager mgr) { + nodeLabelManager = mgr; + } + + @Private + @Unstable + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { + this.schedulerRecoveryStartTime = systemClock.getTime(); + this.schedulerRecoveryWaitTime = waitTime; + } + + @Private + @Unstable + public boolean isSchedulerReadyForAllocatingContainers() { + if (isSchedulerReady) { + return isSchedulerReady; + } + isSchedulerReady = + (systemClock.getTime() - schedulerRecoveryStartTime) > schedulerRecoveryWaitTime; + if (!isSchedulerReady && printLog) { + LOG.info("Skip allocating containers. Scheduler is waiting for recovery."); + printLog = false; + } + if (isSchedulerReady) { + LOG.info("Scheduler recovery is done. Start allocating new containers."); + } + return isSchedulerReady; + } + + @Private + @Unstable + public void setSystemClock(Clock clock) { + this.systemClock = clock; + } + + @Private + @Unstable + public ConcurrentMap getSystemCredentialsForApps() { + return systemCredentials; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb8b75b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 7c1db3d..55d7667 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -19,24 +19,20 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -51,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; import com.google.common.annotations.VisibleForTesting; @@ -59,52 +54,16 @@ public class RMContextImpl implements RMContext { private Dispatcher rmDispatcher; - private final ConcurrentMap applications - = new ConcurrentHashMap(); - - private final ConcurrentMap nodes - = new ConcurrentHashMap(); - - private final ConcurrentMap inactiveNodes - = new ConcurrentHashMap(); - - private final ConcurrentMap systemCredentials = - new ConcurrentHashMap(); - private boolean isHAEnabled; - private boolean isWorkPreservingRecoveryEnabled; + private HAServiceState haServiceState = HAServiceProtocol.HAServiceState.INITIALIZING; - - private AMLivelinessMonitor amLivelinessMonitor; - private AMLivelinessMonitor amFinishingMonitor; - private RMStateStore stateStore = null; - private ContainerAllocationExpirer containerAllocationExpirer; - private DelegationTokenRenewer delegationTokenRenewer; - private AMRMTokenSecretManager amRMTokenSecretManager; - private RMContainerTokenSecretManager containerTokenSecretManager; - private NMTokenSecretManagerInRM nmTokenSecretManager; - private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; + private AdminService adminService; - private ClientRMService clientRMService; - private RMDelegationTokenSecretManager rmDelegationTokenSecretManager; - private ResourceScheduler scheduler; - private ReservationSystem reservationSystem; - private NodesListManager nodesListManager; - private ResourceTrackerService resourceTrackerService; - private ApplicationMasterService applicationMasterService; - private RMApplicationHistoryWriter rmApplicationHistoryWriter; - private SystemMetricsPublisher systemMetricsPublisher; + private ConfigurationProvider configurationProvider; - private RMNodeLabelsManager nodeLabelManager; - private long epoch; - private Clock systemClock = new SystemClock(); - private long schedulerRecoveryStartTime = 0; - private long schedulerRecoveryWaitTime = 0; - private boolean printLog = true; - private boolean isSchedulerReady = false; - private static final Log LOG = LogFactory.getLog(RMContextImpl.class); + private RMActiveServiceContext activeServiceContext; /** * Default constructor. To be used in conjunction with setter methods for @@ -128,24 +87,11 @@ public class RMContextImpl implements RMContext { RMApplicationHistoryWriter rmApplicationHistoryWriter) { this(); this.setDispatcher(rmDispatcher); - this.setContainerAllocationExpirer(containerAllocationExpirer); - this.setAMLivelinessMonitor(amLivelinessMonitor); - this.setAMFinishingMonitor(amFinishingMonitor); - this.setDelegationTokenRenewer(delegationTokenRenewer); - this.setAMRMTokenSecretManager(appTokenSecretManager); - this.setContainerTokenSecretManager(containerTokenSecretManager); - this.setNMTokenSecretManager(nmTokenSecretManager); - this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); - this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); - - RMStateStore nullStore = new NullRMStateStore(); - nullStore.setRMDispatcher(rmDispatcher); - try { - nullStore.init(new YarnConfiguration()); - setStateStore(nullStore); - } catch (Exception e) { - assert false; - } + setActiveServiceContext(new RMActiveServiceContext(rmDispatcher, + containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, + delegationTokenRenewer, appTokenSecretManager, + containerTokenSecretManager, nmTokenSecretManager, + clientToAMTokenSecretManager, rmApplicationHistoryWriter)); ConfigurationProvider provider = new LocalConfigurationProvider(); setConfigurationProvider(provider); @@ -155,80 +101,80 @@ public class RMContextImpl implements RMContext { public Dispatcher getDispatcher() { return this.rmDispatcher; } - - @Override + + @Override public RMStateStore getStateStore() { - return stateStore; + return activeServiceContext.getStateStore(); } @Override public ConcurrentMap getRMApps() { - return this.applications; + return activeServiceContext.getRMApps(); } @Override public ConcurrentMap getRMNodes() { - return this.nodes; + return activeServiceContext.getRMNodes(); } - + @Override public ConcurrentMap getInactiveRMNodes() { - return this.inactiveNodes; + return activeServiceContext.getInactiveRMNodes(); } @Override public ContainerAllocationExpirer getContainerAllocationExpirer() { - return this.containerAllocationExpirer; + return activeServiceContext.getContainerAllocationExpirer(); } @Override public AMLivelinessMonitor getAMLivelinessMonitor() { - return this.amLivelinessMonitor; + return activeServiceContext.getAMLivelinessMonitor(); } @Override public AMLivelinessMonitor getAMFinishingMonitor() { - return this.amFinishingMonitor; + return activeServiceContext.getAMFinishingMonitor(); } @Override public DelegationTokenRenewer getDelegationTokenRenewer() { - return delegationTokenRenewer; + return activeServiceContext.getDelegationTokenRenewer(); } @Override public AMRMTokenSecretManager getAMRMTokenSecretManager() { - return this.amRMTokenSecretManager; + return activeServiceContext.getAMRMTokenSecretManager(); } @Override public RMContainerTokenSecretManager getContainerTokenSecretManager() { - return this.containerTokenSecretManager; + return activeServiceContext.getContainerTokenSecretManager(); } - + @Override public NMTokenSecretManagerInRM getNMTokenSecretManager() { - return this.nmTokenSecretManager; + return activeServiceContext.getNMTokenSecretManager(); } @Override public ResourceScheduler getScheduler() { - return this.scheduler; + return activeServiceContext.getScheduler(); } @Override public ReservationSystem getReservationSystem() { - return this.reservationSystem; + return activeServiceContext.getReservationSystem(); } - + @Override public NodesListManager getNodesListManager() { - return this.nodesListManager; + return activeServiceContext.getNodesListManager(); } @Override public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { - return this.clientToAMTokenSecretManager; + return activeServiceContext.getClientToAMTokenSecretManager(); } @Override @@ -238,22 +184,22 @@ public class RMContextImpl implements RMContext { @VisibleForTesting public void setStateStore(RMStateStore store) { - stateStore = store; + activeServiceContext.setStateStore(store); } - + @Override public ClientRMService getClientRMService() { - return this.clientRMService; + return activeServiceContext.getClientRMService(); } @Override public ApplicationMasterService getApplicationMasterService() { - return applicationMasterService; + return activeServiceContext.getApplicationMasterService(); } @Override public ResourceTrackerService getResourceTrackerService() { - return resourceTrackerService; + return activeServiceContext.getResourceTrackerService(); } void setHAEnabled(boolean isHAEnabled) { @@ -276,78 +222,78 @@ public class RMContextImpl implements RMContext { @Override public void setClientRMService(ClientRMService clientRMService) { - this.clientRMService = clientRMService; + activeServiceContext.setClientRMService(clientRMService); } - + @Override public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() { - return this.rmDelegationTokenSecretManager; + return activeServiceContext.getRMDelegationTokenSecretManager(); } - + @Override public void setRMDelegationTokenSecretManager( RMDelegationTokenSecretManager delegationTokenSecretManager) { - this.rmDelegationTokenSecretManager = delegationTokenSecretManager; + activeServiceContext + .setRMDelegationTokenSecretManager(delegationTokenSecretManager); } void setContainerAllocationExpirer( ContainerAllocationExpirer containerAllocationExpirer) { - this.containerAllocationExpirer = containerAllocationExpirer; + activeServiceContext + .setContainerAllocationExpirer(containerAllocationExpirer); } void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) { - this.amLivelinessMonitor = amLivelinessMonitor; + activeServiceContext.setAMLivelinessMonitor(amLivelinessMonitor); } void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) { - this.amFinishingMonitor = amFinishingMonitor; + activeServiceContext.setAMFinishingMonitor(amFinishingMonitor); } void setContainerTokenSecretManager( RMContainerTokenSecretManager containerTokenSecretManager) { - this.containerTokenSecretManager = containerTokenSecretManager; + activeServiceContext + .setContainerTokenSecretManager(containerTokenSecretManager); } - void setNMTokenSecretManager( - NMTokenSecretManagerInRM nmTokenSecretManager) { - this.nmTokenSecretManager = nmTokenSecretManager; + void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) { + activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager); } void setScheduler(ResourceScheduler scheduler) { - this.scheduler = scheduler; + activeServiceContext.setScheduler(scheduler); } - + void setReservationSystem(ReservationSystem reservationSystem) { - this.reservationSystem = reservationSystem; + activeServiceContext.setReservationSystem(reservationSystem); } - void setDelegationTokenRenewer( - DelegationTokenRenewer delegationTokenRenewer) { - this.delegationTokenRenewer = delegationTokenRenewer; + void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) { + activeServiceContext.setDelegationTokenRenewer(delegationTokenRenewer); } void setClientToAMTokenSecretManager( ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { - this.clientToAMTokenSecretManager = clientToAMTokenSecretManager; + activeServiceContext + .setClientToAMTokenSecretManager(clientToAMTokenSecretManager); } - void setAMRMTokenSecretManager( - AMRMTokenSecretManager amRMTokenSecretManager) { - this.amRMTokenSecretManager = amRMTokenSecretManager; + void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) { + activeServiceContext.setAMRMTokenSecretManager(amRMTokenSecretManager); } void setNodesListManager(NodesListManager nodesListManager) { - this.nodesListManager = nodesListManager; + activeServiceContext.setNodesListManager(nodesListManager); } void setApplicationMasterService( ApplicationMasterService applicationMasterService) { - this.applicationMasterService = applicationMasterService; + activeServiceContext.setApplicationMasterService(applicationMasterService); } - void setResourceTrackerService( - ResourceTrackerService resourceTrackerService) { - this.resourceTrackerService = resourceTrackerService; + void setResourceTrackerService(ResourceTrackerService resourceTrackerService) { + activeServiceContext.setResourceTrackerService(resourceTrackerService); } @Override @@ -363,34 +309,35 @@ public class RMContextImpl implements RMContext { } public void setWorkPreservingRecoveryEnabled(boolean enabled) { - this.isWorkPreservingRecoveryEnabled = enabled; + activeServiceContext.setWorkPreservingRecoveryEnabled(enabled); } @Override public boolean isWorkPreservingRecoveryEnabled() { - return this.isWorkPreservingRecoveryEnabled; + return activeServiceContext.isWorkPreservingRecoveryEnabled(); } @Override public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { - return rmApplicationHistoryWriter; + return activeServiceContext.getRMApplicationHistoryWriter(); } @Override public void setSystemMetricsPublisher( SystemMetricsPublisher systemMetricsPublisher) { - this.systemMetricsPublisher = systemMetricsPublisher; + activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher); } @Override public SystemMetricsPublisher getSystemMetricsPublisher() { - return systemMetricsPublisher; + return activeServiceContext.getSystemMetricsPublisher(); } @Override public void setRMApplicationHistoryWriter( RMApplicationHistoryWriter rmApplicationHistoryWriter) { - this.rmApplicationHistoryWriter = rmApplicationHistoryWriter; + activeServiceContext + .setRMApplicationHistoryWriter(rmApplicationHistoryWriter); } @Override @@ -405,51 +352,51 @@ public class RMContextImpl implements RMContext { @Override public long getEpoch() { - return this.epoch; + return activeServiceContext.getEpoch(); } void setEpoch(long epoch) { - this.epoch = epoch; + activeServiceContext.setEpoch(epoch); } @Override public RMNodeLabelsManager getNodeLabelManager() { - return nodeLabelManager; + return activeServiceContext.getNodeLabelManager(); } - + @Override public void setNodeLabelManager(RMNodeLabelsManager mgr) { - nodeLabelManager = mgr; + activeServiceContext.setNodeLabelManager(mgr); } public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { - this.schedulerRecoveryStartTime = systemClock.getTime(); - this.schedulerRecoveryWaitTime = waitTime; + activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); } public boolean isSchedulerReadyForAllocatingContainers() { - if (isSchedulerReady) { - return isSchedulerReady; - } - isSchedulerReady = (systemClock.getTime() - schedulerRecoveryStartTime) - > schedulerRecoveryWaitTime; - if (!isSchedulerReady && printLog) { - LOG.info("Skip allocating containers. Scheduler is waiting for recovery."); - printLog = false; - } - if (isSchedulerReady) { - LOG.info("Scheduler recovery is done. Start allocating new containers."); - } - return isSchedulerReady; + return activeServiceContext.isSchedulerReadyForAllocatingContainers(); } @Private @VisibleForTesting public void setSystemClock(Clock clock) { - this.systemClock = clock; + activeServiceContext.setSystemClock(clock); } public ConcurrentMap getSystemCredentialsForApps() { - return systemCredentials; + return activeServiceContext.getSystemCredentialsForApps(); } + + @Private + @Unstable + public RMActiveServiceContext getActiveServiceContext() { + return activeServiceContext; + } + + @Private + @Unstable + void setActiveServiceContext(RMActiveServiceContext activeServiceContext) { + this.activeServiceContext = activeServiceContext; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb8b75b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e0840b6..3ce42a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -400,6 +400,7 @@ public class ResourceManager extends CompositeService implements Recoverable { private ContainerAllocationExpirer containerAllocationExpirer; private ResourceManager rm; private boolean recoveryEnabled; + private RMActiveServiceContext activeServiceContext; RMActiveServices(ResourceManager rm) { super("RMActiveServices"); @@ -408,6 +409,9 @@ public class ResourceManager extends CompositeService implements Recoverable { @Override protected void serviceInit(Configuration configuration) throws Exception { + activeServiceContext = new RMActiveServiceContext(); + rmContext.setActiveServiceContext(activeServiceContext); + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); rmSecretManagerService = createRMSecretManagerService(); @@ -1008,11 +1012,15 @@ public class ResourceManager extends CompositeService implements Recoverable { if (activeServices != null) { activeServices.stop(); activeServices = null; - rmContext.getRMNodes().clear(); - rmContext.getInactiveRMNodes().clear(); - rmContext.getRMApps().clear(); - ClusterMetrics.destroy(); - QueueMetrics.clearQueueMetrics(); + } + } + + void reinitialize(boolean initialize) throws Exception { + ClusterMetrics.destroy(); + QueueMetrics.clearQueueMetrics(); + if (initialize) { + resetDispatcher(); + createAndInitActiveServices(); } } @@ -1036,8 +1044,7 @@ public class ResourceManager extends CompositeService implements Recoverable { startActiveServices(); return null; } catch (Exception e) { - resetDispatcher(); - createAndInitActiveServices(); + reinitialize(true); throw e; } } @@ -1059,10 +1066,7 @@ public class ResourceManager extends CompositeService implements Recoverable { if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) { stopActiveServices(); - if (initialize) { - resetDispatcher(); - createAndInitActiveServices(); - } + reinitialize(initialize); } rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); LOG.info("Transitioned to standby state"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9cb8b75b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index c6d7d09..122eb60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -513,6 +513,68 @@ public class TestRMHA { rm.stop(); } + @Test + public void testFailoverClearsRMContext() throws Exception { + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + Configuration conf = new YarnConfiguration(configuration); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // 1. start RM + rm = new MockRM(conf, memStore); + rm.init(conf); + rm.start(); + + StateChangeRequestInfo requestInfo = + new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + + // 2. Transition to active + rm.adminService.transitionToActive(requestInfo); + checkMonitorHealth(); + checkActiveRMFunctionality(); + verifyClusterMetrics(1, 1, 1, 1, 2048, 1); + assertEquals(1, rm.getRMContext().getRMNodes().size()); + assertEquals(1, rm.getRMContext().getRMApps().size()); + + // 3. Create new RM + rm = new MockRM(conf, memStore) { + @Override + protected ResourceTrackerService createResourceTrackerService() { + return new ResourceTrackerService(this.rmContext, + this.nodesListManager, this.nmLivelinessMonitor, + this.rmContext.getContainerTokenSecretManager(), + this.rmContext.getNMTokenSecretManager()) { + @Override + protected void serviceStart() throws Exception { + throw new Exception("ResourceTracker service failed"); + } + }; + } + }; + rm.init(conf); + rm.start(); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + + // 4. Try Transition to active, throw exception + try { + rm.adminService.transitionToActive(requestInfo); + Assert.fail("Transitioned to Active should throw exception."); + } catch (Exception e) { + assertTrue("Error when transitioning to Active mode".contains(e + .getMessage())); + } + // 5. Clears the metrics + verifyClusterMetrics(0, 0, 0, 0, 0, 0); + assertEquals(0, rm.getRMContext().getRMNodes().size()); + assertEquals(0, rm.getRMContext().getRMApps().size()); + } + public void innerTestHAWithRMHostName(boolean includeBindHost) { //this is run two times, with and without a bind host configured if (includeBindHost) {