Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CE59A200BC9 for ; Fri, 11 Nov 2016 23:42:59 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CCCE3160B17; Fri, 11 Nov 2016 22:42:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 29295160AF6 for ; Fri, 11 Nov 2016 23:42:58 +0100 (CET) Received: (qmail 94599 invoked by uid 500); 11 Nov 2016 22:42:49 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 93087 invoked by uid 99); 11 Nov 2016 22:42:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Nov 2016 22:42:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F291EE09B6; Fri, 11 Nov 2016 22:42:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: subru@apache.org To: common-commits@hadoop.apache.org Date: Fri, 11 Nov 2016 22:43:17 -0000 Message-Id: <86410380d21840eba17e18af379eb22b@git.apache.org> In-Reply-To: <00c3cd38c7224cf6bee16084d5e2a137@git.apache.org> References: <00c3cd38c7224cf6bee16084d5e2a137@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [30/50] [abbrv] hadoop git commit: YARN-3671. Integrate Federation services with ResourceManager. Contributed by Subru Krishnan archived-at: Fri, 11 Nov 2016 22:43:00 -0000 YARN-3671. Integrate Federation services with ResourceManager. Contributed by Subru Krishnan Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/02f3f84b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/02f3f84b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/02f3f84b Branch: refs/heads/YARN-2915 Commit: 02f3f84b5a557f27adcfcf2bb199ad5ff455eadc Parents: 59d87f4 Author: Jian He Authored: Tue Aug 30 12:20:52 2016 +0800 Committer: Subru Krishnan Committed: Fri Nov 11 14:20:46 2016 -0800 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 11 +- .../yarn/conf/TestYarnConfigurationFields.java | 4 +- .../failover/FederationProxyProviderUtil.java | 2 +- .../FederationRMFailoverProxyProvider.java | 4 +- ...ationMembershipStateStoreInputValidator.java | 7 +- .../TestFederationStateStoreInputValidator.java | 10 +- .../server/resourcemanager/ResourceManager.java | 26 ++ .../FederationStateStoreHeartbeat.java | 108 +++++++ .../federation/FederationStateStoreService.java | 304 +++++++++++++++++++ .../federation/package-info.java | 17 ++ .../webapp/dao/ClusterMetricsInfo.java | 5 +- .../TestFederationRMStateStoreService.java | 170 +++++++++++ 12 files changed, 648 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8abf1dc..c8cd027 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2491,9 +2491,6 @@ public class YarnConfiguration extends Configuration { FEDERATION_PREFIX + "failover.enabled"; public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true; - public static final String FEDERATION_SUBCLUSTER_ID = - FEDERATION_PREFIX + "sub-cluster.id"; - public static final String FEDERATION_STATESTORE_CLIENT_CLASS = FEDERATION_PREFIX + "state-store.class"; @@ -2506,6 +2503,14 @@ public class YarnConfiguration extends Configuration { // 5 minutes public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; + public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = + FEDERATION_PREFIX + "state-store.heartbeat-interval-secs"; + + // 5 minutes + public static final int + DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = + 5 * 60; + public static final String FEDERATION_MACHINE_LIST = FEDERATION_PREFIX + "machine-list"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 4a5cc8c..4c08e71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -96,9 +96,9 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS); configurationPropsToSkipCompare - .add(YarnConfiguration.FEDERATION_SUBCLUSTER_ID); - configurationPropsToSkipCompare .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS); // Ignore blacklisting nodes for AM failures feature since it is still a // "work in progress" http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java index a986008..18f1338 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java @@ -134,7 +134,7 @@ public final class FederationProxyProviderUtil { // are based out of conf private static void updateConf(Configuration conf, SubClusterId subClusterId) { - conf.set(YarnConfiguration.FEDERATION_SUBCLUSTER_ID, subClusterId.getId()); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); // In a Federation setting, we will connect to not just the local cluster RM // but also multiple external RMs. The membership information of all the RMs // that are currently http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java index 90a9239..0ffab0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java @@ -74,8 +74,8 @@ public class FederationRMFailoverProxyProvider this.protocol = proto; this.rmProxy.checkAllowedProtocols(this.protocol); String clusterId = - configuration.get(YarnConfiguration.FEDERATION_SUBCLUSTER_ID); - Preconditions.checkNotNull(clusterId, "Missing Federation SubClusterId"); + configuration.get(YarnConfiguration.RM_CLUSTER_ID); + Preconditions.checkNotNull(clusterId, "Missing RM ClusterId"); this.subClusterId = SubClusterId.newInstance(clusterId); this.facade = facade.getInstance(); if (configuration instanceof YarnConfiguration) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java index b587ee5..ff9d8e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java @@ -159,7 +159,10 @@ public final class FederationMembershipStateStoreInputValidator { } /** - * Validate if the SubCluster Info are present or not. + * Validate if all the required fields on {@link SubClusterInfo} are present + * or not. {@code Capability} will be empty as the corresponding + * {@code ResourceManager} is in the process of initialization during + * registration. * * @param subClusterInfo the information of the subcluster to be verified * @throws FederationStateStoreInvalidInputException if the SubCluster Info @@ -194,8 +197,6 @@ public final class FederationMembershipStateStoreInputValidator { // validate subcluster state checkSubClusterState(subClusterInfo.getState()); - // validate subcluster capability - checkCapability(subClusterInfo.getCapability()); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java index 13175ae..b95f17a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java @@ -242,11 +242,8 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator .validateSubClusterRegisterRequest(request); - Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { - LOG.info(e.getMessage()); - Assert.assertTrue( - e.getMessage().startsWith("Invalid capability information.")); + Assert.fail(e.getMessage()); } // Execution with Empty Capability @@ -260,11 +257,8 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator .validateSubClusterRegisterRequest(request); - Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { - LOG.info(e.getMessage()); - Assert.assertTrue( - e.getMessage().startsWith("Invalid capability information.")); + Assert.fail(e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 03daeb9..049996b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService; import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher; @@ -187,6 +188,7 @@ public class ResourceManager extends CompositeService implements Recoverable { protected RMAppManager rmAppManager; protected ApplicationACLsManager applicationACLsManager; protected QueueACLsManager queueACLsManager; + private FederationStateStoreService federationStateStoreService; private WebApp webApp; private AppReportFetcher fetcher = null; protected ResourceTrackerService resourceTracker; @@ -479,6 +481,10 @@ public class ResourceManager extends CompositeService implements Recoverable { return new RMTimelineCollectorManager(rmContext); } + private FederationStateStoreService createFederationStateStoreService() { + return new FederationStateStoreService(rmContext); + } + protected SystemMetricsPublisher createSystemMetricsPublisher() { SystemMetricsPublisher publisher; if (YarnConfiguration.timelineServiceEnabled(conf) && @@ -705,6 +711,20 @@ public class ResourceManager extends CompositeService implements Recoverable { delegationTokenRenewer.setRMContext(rmContext); } + if(HAUtil.isFederationEnabled(conf)) { + String cId = YarnConfiguration.getClusterId(conf); + if (cId.isEmpty()) { + String errMsg = + "Cannot initialize RM as Federation is enabled" + + " but cluster id is not configured."; + LOG.error(errMsg); + throw new YarnRuntimeException(errMsg); + } + federationStateStoreService = createFederationStateStoreService(); + addIfService(federationStateStoreService); + LOG.info("Initialized Federation membership."); + } + new RMNMInfo(rmContext, scheduler); super.serviceInit(conf); @@ -1339,6 +1359,12 @@ public class ResourceManager extends CompositeService implements Recoverable { } @Private + @VisibleForTesting + public FederationStateStoreService getFederationStateStoreService() { + return this.federationStateStoreService; + } + + @Private WebApp getWebapp() { return this.webApp; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java new file mode 100644 index 0000000..a4618a2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.federation; + +import java.io.StringWriter; + +import javax.xml.bind.JAXBException; + +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONMarshaller; + +/** + * Periodic heart beat from a ResourceManager participating in + * federation to indicate liveliness. The heart beat publishes the current + * capabilities as represented by {@link ClusterMetricsInfo} of the sub cluster. + * + */ +public class FederationStateStoreHeartbeat implements Runnable { + + private static final Logger LOG = + LoggerFactory.getLogger(FederationStateStoreHeartbeat.class); + + private SubClusterId subClusterId; + private FederationStateStore stateStoreService; + + private final ResourceScheduler rs; + + private StringWriter currentClusterState; + private JSONJAXBContext jc; + private JSONMarshaller marshaller; + private String capability; + + public FederationStateStoreHeartbeat(SubClusterId subClusterId, + FederationStateStore stateStoreClient, ResourceScheduler scheduler) { + this.stateStoreService = stateStoreClient; + this.subClusterId = subClusterId; + this.rs = scheduler; + // Initialize the JAXB Marshaller + this.currentClusterState = new StringWriter(); + try { + this.jc = new JSONJAXBContext( + JSONConfiguration.mapped().rootUnwrapping(false).build(), + ClusterMetricsInfo.class); + marshaller = jc.createJSONMarshaller(); + } catch (JAXBException e) { + LOG.warn("Exception while trying to initialize JAXB context.", e); + } + LOG.info("Initialized Federation membership for cluster with timestamp: " + + ResourceManager.getClusterTimeStamp()); + } + + /** + * Get the current cluster state as a JSON string representation of the + * {@link ClusterMetricsInfo}. + */ + private void updateClusterState() { + try { + // get the current state + currentClusterState.getBuffer().setLength(0); + ClusterMetricsInfo clusterMetricsInfo = new ClusterMetricsInfo(rs); + marshaller.marshallToJSON(clusterMetricsInfo, currentClusterState); + capability = currentClusterState.toString(); + } catch (Exception e) { + LOG.warn("Exception while trying to generate cluster state," + + " so reverting to last know state.", e); + } + } + + @Override + public synchronized void run() { + try { + updateClusterState(); + SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest + .newInstance(subClusterId, SubClusterState.SC_RUNNING, capability); + stateStoreService.subClusterHeartbeat(request); + LOG.debug("Sending the heartbeat with capability: {}", capability); + } catch (Exception e) { + LOG.warn("Exception when trying to heartbeat: ", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java new file mode 100644 index 0000000..9a01d7e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.federation; + +import java.net.InetSocketAddress; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Implements {@link FederationStateStore} and provides a service for + * participating in the federation membership. + */ +public class FederationStateStoreService extends AbstractService + implements FederationStateStore { + + public static final Logger LOG = + LoggerFactory.getLogger(FederationStateStoreService.class); + + private Configuration config; + private ScheduledExecutorService scheduledExecutorService; + private FederationStateStoreHeartbeat stateStoreHeartbeat; + private FederationStateStore stateStoreClient = null; + private SubClusterId subClusterId; + private long heartbeatInterval; + private RMContext rmContext; + + public FederationStateStoreService(RMContext rmContext) { + super(FederationStateStoreService.class.getName()); + LOG.info("FederationStateStoreService initialized"); + this.rmContext = rmContext; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + + this.config = conf; + + RetryPolicy retryPolicy = + FederationStateStoreFacade.createRetryPolicy(conf); + + this.stateStoreClient = + (FederationStateStore) FederationStateStoreFacade.createRetryInstance( + conf, YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS, + FederationStateStore.class, retryPolicy); + this.stateStoreClient.init(conf); + LOG.info("Initialized state store client class"); + + this.subClusterId = + SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); + + heartbeatInterval = conf.getLong( + YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS); + if (heartbeatInterval <= 0) { + heartbeatInterval = + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS; + } + LOG.info("Initialized federation membership service."); + + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + + registerAndInitializeHeartbeat(); + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + Exception ex = null; + try { + if (this.scheduledExecutorService != null + && !this.scheduledExecutorService.isShutdown()) { + this.scheduledExecutorService.shutdown(); + LOG.info("Stopped federation membership heartbeat"); + } + } catch (Exception e) { + LOG.error("Failed to shutdown ScheduledExecutorService", e); + ex = e; + } + + if (this.stateStoreClient != null) { + try { + deregisterSubCluster(SubClusterDeregisterRequest + .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED)); + } finally { + this.stateStoreClient.close(); + } + } + + if (ex != null) { + throw ex; + } + } + + // Return a client accessible string representation of the service address. + private String getServiceAddress(InetSocketAddress address) { + InetSocketAddress socketAddress = NetUtils.getConnectAddress(address); + return socketAddress.getAddress().getHostAddress() + ":" + + socketAddress.getPort(); + } + + private void registerAndInitializeHeartbeat() { + String clientRMAddress = + getServiceAddress(rmContext.getClientRMService().getBindAddress()); + String amRMAddress = getServiceAddress( + rmContext.getApplicationMasterService().getBindAddress()); + String rmAdminAddress = getServiceAddress( + config.getSocketAddr(YarnConfiguration.RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_PORT)); + String webAppAddress = + WebAppUtils.getResolvedRemoteRMWebAppURLWithoutScheme(config); + + SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId, + amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress, + SubClusterState.SC_NEW, ResourceManager.getClusterTimeStamp(), ""); + try { + registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo)); + LOG.info("Successfully registered for federation subcluster: {}", + subClusterInfo); + } catch (Exception e) { + throw new YarnRuntimeException( + "Failed to register Federation membership with the StateStore", e); + } + stateStoreHeartbeat = new FederationStateStoreHeartbeat(subClusterId, + stateStoreClient, rmContext.getScheduler()); + scheduledExecutorService = + HadoopExecutors.newSingleThreadScheduledExecutor(); + scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat, + heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS); + LOG.info("Started federation membership heartbeat with interval: {}", + heartbeatInterval); + } + + @VisibleForTesting + public FederationStateStore getStateStoreClient() { + return stateStoreClient; + } + + @VisibleForTesting + public FederationStateStoreHeartbeat getStateStoreHeartbeatThread() { + return stateStoreHeartbeat; + } + + @Override + public Version getCurrentVersion() { + return stateStoreClient.getCurrentVersion(); + } + + @Override + public Version loadVersion() { + return stateStoreClient.getCurrentVersion(); + } + + @Override + public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest request) throws YarnException { + return stateStoreClient.getPolicyConfiguration(request); + } + + @Override + public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( + SetSubClusterPolicyConfigurationRequest request) throws YarnException { + return stateStoreClient.setPolicyConfiguration(request); + } + + @Override + public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { + return stateStoreClient.getPoliciesConfigurations(request); + } + + @Override + public SubClusterRegisterResponse registerSubCluster( + SubClusterRegisterRequest registerSubClusterRequest) + throws YarnException { + return stateStoreClient.registerSubCluster(registerSubClusterRequest); + } + + @Override + public SubClusterDeregisterResponse deregisterSubCluster( + SubClusterDeregisterRequest subClusterDeregisterRequest) + throws YarnException { + return stateStoreClient.deregisterSubCluster(subClusterDeregisterRequest); + } + + @Override + public SubClusterHeartbeatResponse subClusterHeartbeat( + SubClusterHeartbeatRequest subClusterHeartbeatRequest) + throws YarnException { + return stateStoreClient.subClusterHeartbeat(subClusterHeartbeatRequest); + } + + @Override + public GetSubClusterInfoResponse getSubCluster( + GetSubClusterInfoRequest subClusterRequest) throws YarnException { + return stateStoreClient.getSubCluster(subClusterRequest); + } + + @Override + public GetSubClustersInfoResponse getSubClusters( + GetSubClustersInfoRequest subClustersRequest) throws YarnException { + return stateStoreClient.getSubClusters(subClustersRequest); + } + + @Override + public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest request) throws YarnException { + return stateStoreClient.addApplicationHomeSubCluster(request); + } + + @Override + public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( + UpdateApplicationHomeSubClusterRequest request) throws YarnException { + return stateStoreClient.updateApplicationHomeSubCluster(request); + } + + @Override + public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest request) throws YarnException { + return stateStoreClient.getApplicationHomeSubCluster(request); + } + + @Override + public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest request) throws YarnException { + return stateStoreClient.getApplicationsHomeSubCluster(request); + } + + @Override + public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest request) throws YarnException { + return stateStoreClient.deleteApplicationHomeSubCluster(request); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java new file mode 100644 index 0000000..47c7c65 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.federation; http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java index f083b05..dc42eb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java @@ -65,7 +65,10 @@ public class ClusterMetricsInfo { } // JAXB needs this public ClusterMetricsInfo(final ResourceManager rm) { - ResourceScheduler rs = rm.getResourceScheduler(); + this(rm.getResourceScheduler()); + } + + public ClusterMetricsInfo(final ResourceScheduler rs) { QueueMetrics metrics = rs.getRootQueueMetrics(); ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f3f84b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java new file mode 100644 index 0000000..30f69b5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.federation; + +import java.io.IOException; +import java.io.StringReader; + +import javax.xml.bind.JAXBException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONUnmarshaller; + +/** + * Unit tests for FederationStateStoreService. + */ +public class TestFederationRMStateStoreService { + + private final HAServiceProtocol.StateChangeRequestInfo requestInfo = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + private final SubClusterId subClusterId = SubClusterId.newInstance("SC-1"); + private final GetSubClusterInfoRequest request = + GetSubClusterInfoRequest.newInstance(subClusterId); + + private Configuration conf; + private FederationStateStore stateStore; + private long lastHearbeatTS = 0; + private JSONJAXBContext jc; + private JSONUnmarshaller unmarshaller; + + @Before + public void setUp() throws IOException, YarnException, JAXBException { + conf = new YarnConfiguration(); + jc = new JSONJAXBContext( + JSONConfiguration.mapped().rootUnwrapping(false).build(), + ClusterMetricsInfo.class); + unmarshaller = jc.createJSONUnmarshaller(); + } + + @After + public void tearDown() throws Exception { + unmarshaller = null; + jc = null; + } + + @Test + public void testFederationStateStoreService() throws Exception { + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); + final MockRM rm = new MockRM(conf); + + // Initially there should be no entry for the sub-cluster + rm.init(conf); + stateStore = rm.getFederationStateStoreService().getStateStoreClient(); + try { + stateStore.getSubCluster(request); + Assert.fail("There should be no entry for the sub-cluster."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().endsWith("does not exist")); + } + + // Validate if sub-cluster is registered + rm.start(); + String capability = checkSubClusterInfo(SubClusterState.SC_NEW); + Assert.assertTrue(capability.isEmpty()); + + // Heartbeat to see if sub-cluster transitions to running + FederationStateStoreHeartbeat storeHeartbeat = + rm.getFederationStateStoreService().getStateStoreHeartbeatThread(); + storeHeartbeat.run(); + capability = checkSubClusterInfo(SubClusterState.SC_RUNNING); + checkClusterMetricsInfo(capability, 0); + + // heartbeat again after adding a node. + rm.registerNode("127.0.0.1:1234", 4 * 1024); + storeHeartbeat.run(); + capability = checkSubClusterInfo(SubClusterState.SC_RUNNING); + checkClusterMetricsInfo(capability, 1); + + // Validate sub-cluster deregistration + rm.getFederationStateStoreService() + .deregisterSubCluster(SubClusterDeregisterRequest + .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED)); + checkSubClusterInfo(SubClusterState.SC_UNREGISTERED); + + // check after failover + explicitFailover(rm); + + capability = checkSubClusterInfo(SubClusterState.SC_NEW); + Assert.assertTrue(capability.isEmpty()); + + // Heartbeat to see if sub-cluster transitions to running + storeHeartbeat = + rm.getFederationStateStoreService().getStateStoreHeartbeatThread(); + storeHeartbeat.run(); + capability = checkSubClusterInfo(SubClusterState.SC_RUNNING); + checkClusterMetricsInfo(capability, 0); + + // heartbeat again after adding a node. + rm.registerNode("127.0.0.1:1234", 4 * 1024); + storeHeartbeat.run(); + capability = checkSubClusterInfo(SubClusterState.SC_RUNNING); + checkClusterMetricsInfo(capability, 1); + + rm.stop(); + } + + private void explicitFailover(MockRM rm) throws IOException { + rm.getAdminService().transitionToStandby(requestInfo); + Assert.assertTrue(rm.getRMContext() + .getHAServiceState() == HAServiceProtocol.HAServiceState.STANDBY); + rm.getAdminService().transitionToActive(requestInfo); + Assert.assertTrue(rm.getRMContext() + .getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE); + lastHearbeatTS = 0; + stateStore = rm.getFederationStateStoreService().getStateStoreClient(); + } + + private void checkClusterMetricsInfo(String capability, int numNodes) + throws JAXBException { + ClusterMetricsInfo clusterMetricsInfo = unmarshaller.unmarshalFromJSON( + new StringReader(capability), ClusterMetricsInfo.class); + Assert.assertEquals(numNodes, clusterMetricsInfo.getTotalNodes()); + } + + private String checkSubClusterInfo(SubClusterState state) + throws YarnException { + Assert.assertNotNull(stateStore.getSubCluster(request)); + SubClusterInfo response = + stateStore.getSubCluster(request).getSubClusterInfo(); + Assert.assertEquals(state, response.getState()); + Assert.assertTrue(response.getLastHeartBeat() >= lastHearbeatTS); + lastHearbeatTS = response.getLastHeartBeat(); + return response.getCapability(); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org