Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B6A31200BD8 for ; Wed, 23 Nov 2016 03:38:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B5124160B1F; Wed, 23 Nov 2016 02:38:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 54DA2160B1C for ; Wed, 23 Nov 2016 03:38:48 +0100 (CET) Received: (qmail 80048 invoked by uid 500); 23 Nov 2016 02:38:38 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 77429 invoked by uid 99); 23 Nov 2016 02:38:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Nov 2016 02:38:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 65D7CF1708; Wed, 23 Nov 2016 02:38:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: subru@apache.org To: common-commits@hadoop.apache.org Date: Wed, 23 Nov 2016 02:39:01 -0000 Message-Id: In-Reply-To: <6b7759207b9e49439c2bc7485b3bd495@git.apache.org> References: <6b7759207b9e49439c2bc7485b3bd495@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [26/50] [abbrv] hadoop git commit: YARN-5676. Add a HashBasedRouterPolicy, and small policies and test refactoring. (Carlo Curino via Subru). archived-at: Wed, 23 Nov 2016 02:38:50 -0000 YARN-5676. Add a HashBasedRouterPolicy, and small policies and test refactoring. (Carlo Curino via Subru). Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c9110e3b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c9110e3b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c9110e3b Branch: refs/heads/YARN-2915 Commit: c9110e3b55eeff557255360b36b03a223b4957dd Parents: 5579b22 Author: Subru Krishnan Authored: Tue Nov 22 15:02:22 2016 -0800 Committer: Subru Krishnan Committed: Tue Nov 22 18:37:54 2016 -0800 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 3 +- .../policies/AbstractPolicyManager.java | 175 ----------------- .../policies/FederationPolicyManager.java | 117 ------------ .../PriorityBroadcastPolicyManager.java | 66 ------- .../federation/policies/RouterPolicyFacade.java | 1 + .../policies/UniformBroadcastPolicyManager.java | 56 ------ .../policies/WeightedLocalityPolicyManager.java | 67 ------- .../policies/manager/AbstractPolicyManager.java | 190 +++++++++++++++++++ .../manager/FederationPolicyManager.java | 118 ++++++++++++ .../manager/HashBroadcastPolicyManager.java | 38 ++++ .../manager/PriorityBroadcastPolicyManager.java | 66 +++++++ .../manager/UniformBroadcastPolicyManager.java | 44 +++++ .../manager/WeightedLocalityPolicyManager.java | 67 +++++++ .../policies/manager/package-info.java | 19 ++ .../policies/router/AbstractRouterPolicy.java | 19 ++ .../policies/router/HashBasedRouterPolicy.java | 81 ++++++++ .../policies/router/LoadBasedRouterPolicy.java | 3 + .../policies/router/PriorityRouterPolicy.java | 3 + .../router/UniformRandomRouterPolicy.java | 10 +- .../router/WeightedRandomRouterPolicy.java | 3 + .../policies/BaseFederationPoliciesTest.java | 17 +- .../policies/BasePolicyManagerTest.java | 108 ----------- ...ionPolicyInitializationContextValidator.java | 1 + .../TestPriorityBroadcastPolicyManager.java | 72 ------- .../policies/TestRouterPolicyFacade.java | 2 + .../TestUniformBroadcastPolicyManager.java | 40 ---- .../TestWeightedLocalityPolicyManager.java | 79 -------- .../policies/manager/BasePolicyManagerTest.java | 104 ++++++++++ .../TestHashBasedBroadcastPolicyManager.java | 40 ++++ .../TestPriorityBroadcastPolicyManager.java | 72 +++++++ .../TestUniformBroadcastPolicyManager.java | 40 ++++ .../TestWeightedLocalityPolicyManager.java | 79 ++++++++ .../policies/router/BaseRouterPoliciesTest.java | 51 +++++ .../router/TestHashBasedRouterPolicy.java | 83 ++++++++ .../router/TestLoadBasedRouterPolicy.java | 3 +- .../router/TestPriorityRouterPolicy.java | 3 +- .../router/TestUniformRandomRouterPolicy.java | 3 +- .../router/TestWeightedRandomRouterPolicy.java | 15 +- 38 files changed, 1160 insertions(+), 798 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6def749..505d2ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2528,7 +2528,8 @@ public class YarnConfiguration extends Configuration { + "policy-manager"; public static final String DEFAULT_FEDERATION_POLICY_MANAGER = "org.apache" - + ".hadoop.yarn.server.federation.policies.UniformBroadcastPolicyManager"; + + ".hadoop.yarn.server.federation.policies" + + ".manager.UniformBroadcastPolicyManager"; public static final String FEDERATION_POLICY_MANAGER_PARAMS = FEDERATION_PREFIX + "policy-manager-params"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java deleted file mode 100644 index e77f2e3..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.server.federation.policies; - -import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class provides basic implementation for common methods that multiple - * policies will need to implement. - */ -public abstract class AbstractPolicyManager implements - FederationPolicyManager { - - private String queue; - @SuppressWarnings("checkstyle:visibilitymodifier") - protected Class routerFederationPolicy; - @SuppressWarnings("checkstyle:visibilitymodifier") - protected Class amrmProxyFederationPolicy; - - public static final Logger LOG = - LoggerFactory.getLogger(AbstractPolicyManager.class); - /** - * This default implementation validates the - * {@link FederationPolicyInitializationContext}, - * then checks whether it needs to reinstantiate the class (null or - * mismatching type), and reinitialize the policy. - * - * @param federationPolicyContext the current context - * @param oldInstance the existing (possibly null) instance. - * - * @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy} - * instance - * - * @throws FederationPolicyInitializationException if the reinitalization is - * not valid, and ensure - * previous state is preserved - */ - public FederationAMRMProxyPolicy getAMRMPolicy( - FederationPolicyInitializationContext federationPolicyContext, - FederationAMRMProxyPolicy oldInstance) - throws FederationPolicyInitializationException { - - if (amrmProxyFederationPolicy == null) { - throw new FederationPolicyInitializationException("The parameter " - + "amrmProxyFederationPolicy should be initialized in " - + this.getClass().getSimpleName() + " constructor."); - } - - try { - return (FederationAMRMProxyPolicy) internalPolicyGetter( - federationPolicyContext, oldInstance, amrmProxyFederationPolicy); - } catch (ClassCastException e) { - throw new FederationPolicyInitializationException(e); - } - - } - - /** - * This default implementation validates the - * {@link FederationPolicyInitializationContext}, - * then checks whether it needs to reinstantiate the class (null or - * mismatching type), and reinitialize the policy. - * - * @param federationPolicyContext the current context - * @param oldInstance the existing (possibly null) instance. - * - * @return a valid and fully reinitalized {@link FederationRouterPolicy} - * instance - * - * @throws FederationPolicyInitializationException if the reinitalization is - * not valid, and ensure - * previous state is preserved - */ - - public FederationRouterPolicy getRouterPolicy( - FederationPolicyInitializationContext federationPolicyContext, - FederationRouterPolicy oldInstance) - throws FederationPolicyInitializationException { - - //checks that sub-types properly initialize the types of policies - if (routerFederationPolicy == null) { - throw new FederationPolicyInitializationException("The policy " - + "type should be initialized in " + this.getClass().getSimpleName() - + " constructor."); - } - - try { - return (FederationRouterPolicy) internalPolicyGetter( - federationPolicyContext, oldInstance, routerFederationPolicy); - } catch (ClassCastException e) { - throw new FederationPolicyInitializationException(e); - } - } - - @Override - public String getQueue() { - return queue; - } - - @Override - public void setQueue(String queue) { - this.queue = queue; - } - - /** - * Common functionality to instantiate a reinitialize a {@link - * ConfigurableFederationPolicy}. - */ - private ConfigurableFederationPolicy internalPolicyGetter( - final FederationPolicyInitializationContext federationPolicyContext, - ConfigurableFederationPolicy oldInstance, Class policy) - throws FederationPolicyInitializationException { - - FederationPolicyInitializationContextValidator - .validate(federationPolicyContext, this.getClass().getCanonicalName()); - - if (oldInstance == null || !oldInstance.getClass().equals(policy)) { - try { - oldInstance = (ConfigurableFederationPolicy) policy.newInstance(); - } catch (InstantiationException e) { - throw new FederationPolicyInitializationException(e); - } catch (IllegalAccessException e) { - throw new FederationPolicyInitializationException(e); - } - } - - //copying the context to avoid side-effects - FederationPolicyInitializationContext modifiedContext = - updateContext(federationPolicyContext, - oldInstance.getClass().getCanonicalName()); - - oldInstance.reinitialize(modifiedContext); - return oldInstance; - } - - /** - * This method is used to copy-on-write the context, that will be passed - * downstream to the router/amrmproxy policies. - */ - private FederationPolicyInitializationContext updateContext( - FederationPolicyInitializationContext federationPolicyContext, - String type) { - // copying configuration and context to avoid modification of original - SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration - .newInstance(federationPolicyContext - .getSubClusterPolicyConfiguration()); - newConf.setType(type); - - return new FederationPolicyInitializationContext(newConf, - federationPolicyContext.getFederationSubclusterResolver(), - federationPolicyContext.getFederationStateStoreFacade(), - federationPolicyContext.getHomeSubcluster()); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java deleted file mode 100644 index 39fdba3..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.server.federation.policies; - -import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; - -/** - * - * Implementors need to provide the ability to serliaze a policy and its - * configuration as a {@link SubClusterPolicyConfiguration}, as well as provide - * (re)initialization mechanics for the underlying - * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}. - * - * The serialization aspects are used by admin APIs or a policy engine to store - * a serialized configuration in the {@code FederationStateStore}, while the - * getters methods are used to obtain a propertly inizialized policy in the - * {@code Router} and {@code AMRMProxy} respectively. - * - * This interface by design binds together {@link FederationAMRMProxyPolicy} and - * {@link FederationRouterPolicy} and provide lifecycle support for - * serialization and deserialization, to reduce configuration mistakes - * (combining incompatible policies). - * - */ -public interface FederationPolicyManager { - - /** - * If the current instance is compatible, this method returns the same - * instance of {@link FederationAMRMProxyPolicy} reinitialized with the - * current context, otherwise a new instance initialized with the current - * context is provided. If the instance is compatible with the current class - * the implementors should attempt to reinitalize (retaining state). To affect - * a complete policy reset oldInstance should be null. - * - * @param policyContext the current context - * @param oldInstance the existing (possibly null) instance. - * - * @return an updated {@link FederationAMRMProxyPolicy }. - * - * @throws FederationPolicyInitializationException if the initialization - * cannot be completed properly. The oldInstance should be still - * valid in case of failed initialization. - */ - FederationAMRMProxyPolicy getAMRMPolicy( - FederationPolicyInitializationContext policyContext, - FederationAMRMProxyPolicy oldInstance) - throws FederationPolicyInitializationException; - - /** - * If the current instance is compatible, this method returns the same - * instance of {@link FederationRouterPolicy} reinitialized with the current - * context, otherwise a new instance initialized with the current context is - * provided. If the instance is compatible with the current class the - * implementors should attempt to reinitalize (retaining state). To affect a - * complete policy reset oldInstance shoulb be set to null. - * - * @param policyContext the current context - * @param oldInstance the existing (possibly null) instance. - * - * @return an updated {@link FederationRouterPolicy}. - * - * @throws FederationPolicyInitializationException if the initalization cannot - * be completed properly. The oldInstance should be still valid in - * case of failed initialization. - */ - FederationRouterPolicy getRouterPolicy( - FederationPolicyInitializationContext policyContext, - FederationRouterPolicy oldInstance) - throws FederationPolicyInitializationException; - - /** - * This method is invoked to derive a {@link SubClusterPolicyConfiguration}. - * This is to be used when writing a policy object in the federation policy - * store. - * - * @return a valid policy configuration representing this object - * parametrization. - * - * @throws FederationPolicyInitializationException if the current state cannot - * be serialized properly - */ - SubClusterPolicyConfiguration serializeConf() - throws FederationPolicyInitializationException; - - /** - * This method returns the queue this policy is configured for. - * - * @return the name of the queue. - */ - String getQueue(); - - /** - * This methods provides a setter for the queue this policy is specified for. - * - * @param queue the name of the queue. - */ - void setQueue(String queue); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java deleted file mode 100644 index ebdcf42..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.server.federation.policies; - -import java.nio.ByteBuffer; - -import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; -import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Policy that allows operator to configure "weights" for routing. This picks a - * {@link PriorityRouterPolicy} for the router and a - * {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed to - * work together. - */ -public class PriorityBroadcastPolicyManager extends AbstractPolicyManager { - - private WeightedPolicyInfo weightedPolicyInfo; - - public PriorityBroadcastPolicyManager() { - // this structurally hard-codes two compatible policies for Router and - // AMRMProxy. - routerFederationPolicy = PriorityRouterPolicy.class; - amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; - weightedPolicyInfo = new WeightedPolicyInfo(); - } - - @Override - public SubClusterPolicyConfiguration serializeConf() - throws FederationPolicyInitializationException { - ByteBuffer buf = weightedPolicyInfo.toByteBuffer(); - return SubClusterPolicyConfiguration.newInstance(getQueue(), - this.getClass().getCanonicalName(), buf); - } - - @VisibleForTesting - public WeightedPolicyInfo getWeightedPolicyInfo() { - return weightedPolicyInfo; - } - - @VisibleForTesting - public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) { - this.weightedPolicyInfo = weightedPolicyInfo; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java index a3fd15a..8c22623 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java deleted file mode 100644 index a01f8fa..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.server.federation.policies; - -import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; - -import java.nio.ByteBuffer; - -/** - * This class represents a simple implementation of a {@code - * FederationPolicyManager}. - * - * It combines the basic policies: {@link UniformRandomRouterPolicy} and - * {@link BroadcastAMRMProxyPolicy}, which are designed to work together and - * "spread" the load among sub-clusters uniformly. - * - * This simple policy might impose heavy load on the RMs and return more - * containers than a job requested as all requests are (replicated and) - * broadcasted. - */ -public class UniformBroadcastPolicyManager - extends AbstractPolicyManager { - - public UniformBroadcastPolicyManager() { - //this structurally hard-codes two compatible policies for Router and - // AMRMProxy. - routerFederationPolicy = UniformRandomRouterPolicy.class; - amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; - } - - @Override - public SubClusterPolicyConfiguration serializeConf() - throws FederationPolicyInitializationException { - ByteBuffer buf = ByteBuffer.allocate(0); - return SubClusterPolicyConfiguration - .newInstance(getQueue(), this.getClass().getCanonicalName(), buf); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java deleted file mode 100644 index f3c6673..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.server.federation.policies; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; -import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; - -import java.nio.ByteBuffer; - -/** - * Policy that allows operator to configure "weights" for routing. This picks a - * {@link WeightedRandomRouterPolicy} for the router and a {@link - * LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to - * work together. - */ -public class WeightedLocalityPolicyManager - extends AbstractPolicyManager { - - private WeightedPolicyInfo weightedPolicyInfo; - - public WeightedLocalityPolicyManager() { - //this structurally hard-codes two compatible policies for Router and - // AMRMProxy. - routerFederationPolicy = WeightedRandomRouterPolicy.class; - amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class; - weightedPolicyInfo = new WeightedPolicyInfo(); - } - - @Override - public SubClusterPolicyConfiguration serializeConf() - throws FederationPolicyInitializationException { - ByteBuffer buf = weightedPolicyInfo.toByteBuffer(); - return SubClusterPolicyConfiguration - .newInstance(getQueue(), this.getClass().getCanonicalName(), buf); - } - - @VisibleForTesting - public WeightedPolicyInfo getWeightedPolicyInfo() { - return weightedPolicyInfo; - } - - @VisibleForTesting - public void setWeightedPolicyInfo( - WeightedPolicyInfo weightedPolicyInfo) { - this.weightedPolicyInfo = weightedPolicyInfo; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java new file mode 100644 index 0000000..f7a89c6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; + +/** + * This class provides basic implementation for common methods that multiple + * policies will need to implement. + */ +public abstract class AbstractPolicyManager implements + FederationPolicyManager { + + private String queue; + @SuppressWarnings("checkstyle:visibilitymodifier") + protected Class routerFederationPolicy; + @SuppressWarnings("checkstyle:visibilitymodifier") + protected Class amrmProxyFederationPolicy; + + public static final Logger LOG = + LoggerFactory.getLogger(AbstractPolicyManager.class); + /** + * This default implementation validates the + * {@link FederationPolicyInitializationContext}, + * then checks whether it needs to reinstantiate the class (null or + * mismatching type), and reinitialize the policy. + * + * @param federationPolicyContext the current context + * @param oldInstance the existing (possibly null) instance. + * + * @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy} + * instance + * + * @throws FederationPolicyInitializationException if the reinitalization is + * not valid, and ensure + * previous state is preserved + */ + public FederationAMRMProxyPolicy getAMRMPolicy( + FederationPolicyInitializationContext federationPolicyContext, + FederationAMRMProxyPolicy oldInstance) + throws FederationPolicyInitializationException { + + if (amrmProxyFederationPolicy == null) { + throw new FederationPolicyInitializationException("The parameter " + + "amrmProxyFederationPolicy should be initialized in " + + this.getClass().getSimpleName() + " constructor."); + } + + try { + return (FederationAMRMProxyPolicy) internalPolicyGetter( + federationPolicyContext, oldInstance, amrmProxyFederationPolicy); + } catch (ClassCastException e) { + throw new FederationPolicyInitializationException(e); + } + + } + + /** + * This default implementation validates the + * {@link FederationPolicyInitializationContext}, + * then checks whether it needs to reinstantiate the class (null or + * mismatching type), and reinitialize the policy. + * + * @param federationPolicyContext the current context + * @param oldInstance the existing (possibly null) instance. + * + * @return a valid and fully reinitalized {@link FederationRouterPolicy} + * instance + * + * @throws FederationPolicyInitializationException if the reinitalization is + * not valid, and ensure + * previous state is preserved + */ + + public FederationRouterPolicy getRouterPolicy( + FederationPolicyInitializationContext federationPolicyContext, + FederationRouterPolicy oldInstance) + throws FederationPolicyInitializationException { + + //checks that sub-types properly initialize the types of policies + if (routerFederationPolicy == null) { + throw new FederationPolicyInitializationException("The policy " + + "type should be initialized in " + this.getClass().getSimpleName() + + " constructor."); + } + + try { + return (FederationRouterPolicy) internalPolicyGetter( + federationPolicyContext, oldInstance, routerFederationPolicy); + } catch (ClassCastException e) { + throw new FederationPolicyInitializationException(e); + } + } + + @Override + public SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException { + // default implementation works only for sub-classes which do not require + // any parameters + ByteBuffer buf = ByteBuffer.allocate(0); + return SubClusterPolicyConfiguration + .newInstance(getQueue(), this.getClass().getCanonicalName(), buf); + } + + @Override + public String getQueue() { + return queue; + } + + @Override + public void setQueue(String queue) { + this.queue = queue; + } + + /** + * Common functionality to instantiate a reinitialize a {@link + * ConfigurableFederationPolicy}. + */ + private ConfigurableFederationPolicy internalPolicyGetter( + final FederationPolicyInitializationContext federationPolicyContext, + ConfigurableFederationPolicy oldInstance, Class policy) + throws FederationPolicyInitializationException { + + FederationPolicyInitializationContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + + if (oldInstance == null || !oldInstance.getClass().equals(policy)) { + try { + oldInstance = (ConfigurableFederationPolicy) policy.newInstance(); + } catch (InstantiationException e) { + throw new FederationPolicyInitializationException(e); + } catch (IllegalAccessException e) { + throw new FederationPolicyInitializationException(e); + } + } + + //copying the context to avoid side-effects + FederationPolicyInitializationContext modifiedContext = + updateContext(federationPolicyContext, + oldInstance.getClass().getCanonicalName()); + + oldInstance.reinitialize(modifiedContext); + return oldInstance; + } + + /** + * This method is used to copy-on-write the context, that will be passed + * downstream to the router/amrmproxy policies. + */ + private FederationPolicyInitializationContext updateContext( + FederationPolicyInitializationContext federationPolicyContext, + String type) { + // copying configuration and context to avoid modification of original + SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration + .newInstance(federationPolicyContext + .getSubClusterPolicyConfiguration()); + newConf.setType(type); + + return new FederationPolicyInitializationContext(newConf, + federationPolicyContext.getFederationSubclusterResolver(), + federationPolicyContext.getFederationStateStoreFacade(), + federationPolicyContext.getHomeSubcluster()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java new file mode 100644 index 0000000..1434c80 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +/** + * + * Implementors need to provide the ability to serliaze a policy and its + * configuration as a {@link SubClusterPolicyConfiguration}, as well as provide + * (re)initialization mechanics for the underlying + * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}. + * + * The serialization aspects are used by admin APIs or a policy engine to store + * a serialized configuration in the {@code FederationStateStore}, while the + * getters methods are used to obtain a propertly inizialized policy in the + * {@code Router} and {@code AMRMProxy} respectively. + * + * This interface by design binds together {@link FederationAMRMProxyPolicy} and + * {@link FederationRouterPolicy} and provide lifecycle support for + * serialization and deserialization, to reduce configuration mistakes + * (combining incompatible policies). + * + */ +public interface FederationPolicyManager { + + /** + * If the current instance is compatible, this method returns the same + * instance of {@link FederationAMRMProxyPolicy} reinitialized with the + * current context, otherwise a new instance initialized with the current + * context is provided. If the instance is compatible with the current class + * the implementors should attempt to reinitalize (retaining state). To affect + * a complete policy reset oldInstance should be null. + * + * @param policyContext the current context + * @param oldInstance the existing (possibly null) instance. + * + * @return an updated {@link FederationAMRMProxyPolicy }. + * + * @throws FederationPolicyInitializationException if the initialization + * cannot be completed properly. The oldInstance should be still + * valid in case of failed initialization. + */ + FederationAMRMProxyPolicy getAMRMPolicy( + FederationPolicyInitializationContext policyContext, + FederationAMRMProxyPolicy oldInstance) + throws FederationPolicyInitializationException; + + /** + * If the current instance is compatible, this method returns the same + * instance of {@link FederationRouterPolicy} reinitialized with the current + * context, otherwise a new instance initialized with the current context is + * provided. If the instance is compatible with the current class the + * implementors should attempt to reinitalize (retaining state). To affect a + * complete policy reset oldInstance shoulb be set to null. + * + * @param policyContext the current context + * @param oldInstance the existing (possibly null) instance. + * + * @return an updated {@link FederationRouterPolicy}. + * + * @throws FederationPolicyInitializationException if the initalization cannot + * be completed properly. The oldInstance should be still valid in + * case of failed initialization. + */ + FederationRouterPolicy getRouterPolicy( + FederationPolicyInitializationContext policyContext, + FederationRouterPolicy oldInstance) + throws FederationPolicyInitializationException; + + /** + * This method is invoked to derive a {@link SubClusterPolicyConfiguration}. + * This is to be used when writing a policy object in the federation policy + * store. + * + * @return a valid policy configuration representing this object + * parametrization. + * + * @throws FederationPolicyInitializationException if the current state cannot + * be serialized properly + */ + SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException; + + /** + * This method returns the queue this policy is configured for. + * + * @return the name of the queue. + */ + String getQueue(); + + /** + * This methods provides a setter for the queue this policy is specified for. + * + * @param queue the name of the queue. + */ + void setQueue(String queue); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java new file mode 100644 index 0000000..08ab08f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.HashBasedRouterPolicy; + +/** + * Policy that routes applications via hashing of their queuename, and broadcast + * resource requests. This picks a {@link HashBasedRouterPolicy} for the router + * and a {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed + * to work together. + */ +public class HashBroadcastPolicyManager extends AbstractPolicyManager { + + public HashBroadcastPolicyManager() { + // this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + routerFederationPolicy = HashBasedRouterPolicy.class; + amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java new file mode 100644 index 0000000..8139e12 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Policy that allows operator to configure "weights" for routing. This picks a + * {@link PriorityRouterPolicy} for the router and a + * {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed to + * work together. + */ +public class PriorityBroadcastPolicyManager extends AbstractPolicyManager { + + private WeightedPolicyInfo weightedPolicyInfo; + + public PriorityBroadcastPolicyManager() { + // this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + routerFederationPolicy = PriorityRouterPolicy.class; + amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; + weightedPolicyInfo = new WeightedPolicyInfo(); + } + + @Override + public SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException { + ByteBuffer buf = weightedPolicyInfo.toByteBuffer(); + return SubClusterPolicyConfiguration.newInstance(getQueue(), + this.getClass().getCanonicalName(), buf); + } + + @VisibleForTesting + public WeightedPolicyInfo getWeightedPolicyInfo() { + return weightedPolicyInfo; + } + + @VisibleForTesting + public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) { + this.weightedPolicyInfo = weightedPolicyInfo; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java new file mode 100644 index 0000000..5db0466 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; + +/** + * This class represents a simple implementation of a {@code + * FederationPolicyManager}. + * + * It combines the basic policies: {@link UniformRandomRouterPolicy} and + * {@link BroadcastAMRMProxyPolicy}, which are designed to work together and + * "spread" the load among sub-clusters uniformly. + * + * This simple policy might impose heavy load on the RMs and return more + * containers than a job requested as all requests are (replicated and) + * broadcasted. + */ +public class UniformBroadcastPolicyManager extends AbstractPolicyManager { + + public UniformBroadcastPolicyManager() { + // this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + routerFederationPolicy = UniformRandomRouterPolicy.class; + amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java new file mode 100644 index 0000000..109b534 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +import java.nio.ByteBuffer; + +/** + * Policy that allows operator to configure "weights" for routing. This picks a + * {@link WeightedRandomRouterPolicy} for the router and a {@link + * LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to + * work together. + */ +public class WeightedLocalityPolicyManager + extends AbstractPolicyManager { + + private WeightedPolicyInfo weightedPolicyInfo; + + public WeightedLocalityPolicyManager() { + //this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + routerFederationPolicy = WeightedRandomRouterPolicy.class; + amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class; + weightedPolicyInfo = new WeightedPolicyInfo(); + } + + @Override + public SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException { + ByteBuffer buf = weightedPolicyInfo.toByteBuffer(); + return SubClusterPolicyConfiguration + .newInstance(getQueue(), this.getClass().getCanonicalName(), buf); + } + + @VisibleForTesting + public WeightedPolicyInfo getWeightedPolicyInfo() { + return weightedPolicyInfo; + } + + @VisibleForTesting + public void setWeightedPolicyInfo( + WeightedPolicyInfo weightedPolicyInfo) { + this.weightedPolicyInfo = weightedPolicyInfo; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java new file mode 100644 index 0000000..9515c01 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Various implementation of FederationPolicyManager. **/ +package org.apache.hadoop.yarn.server.federation.policies.manager; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java index f49af1d..730fb41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java @@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.federation.policies.router; import java.util.Map; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -44,4 +47,20 @@ public abstract class AbstractRouterPolicy extends } } + public void validate(ApplicationSubmissionContext appSubmissionContext) + throws FederationPolicyException { + + if (appSubmissionContext == null) { + throw new FederationPolicyException( + "Cannot route an application with null context."); + } + + // if the queue is not specified we set it to default value, to be + // compatible with YARN behavior. + String queue = appSubmissionContext.getQueue(); + if (queue == null) { + appSubmissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java new file mode 100644 index 0000000..e40e87e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +/** + * This {@link FederationRouterPolicy} pick a subcluster based on the hash of + * the job's queue name. Useful to provide a default behavior when too many + * queues exist in a system. This also ensures that all jobs belonging to a + * queue are mapped to the same sub-cluster (likely help with locality). + */ +public class HashBasedRouterPolicy extends AbstractRouterPolicy { + + @Override + public void reinitialize( + FederationPolicyInitializationContext federationPolicyContext) + throws FederationPolicyInitializationException { + FederationPolicyInitializationContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + + // note: this overrides BaseRouterPolicy and ignores the weights + setPolicyContext(federationPolicyContext); + } + + /** + * Simply picks from alphabetically-sorted active subclusters based on the + * hash of quey name. Jobs of the same queue will all be routed to the same + * sub-cluster, as far as the number of active sub-cluster and their names + * remain the same. + * + * @param appSubmissionContext the context for the app being submitted. + * + * @return a hash-based chosen subcluster. + * + * @throws YarnException if there are no active subclusters. + */ + public SubClusterId getHomeSubcluster( + ApplicationSubmissionContext appSubmissionContext) throws YarnException { + + // throws if no active subclusters available + Map activeSubclusters = + getActiveSubclusters(); + + validate(appSubmissionContext); + + int chosenPosition = Math.abs( + appSubmissionContext.getQueue().hashCode() % activeSubclusters.size()); + + List list = new ArrayList<>(activeSubclusters.keySet()); + Collections.sort(list); + return list.get(chosenPosition); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java index 5de749f..2ca15bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java @@ -64,6 +64,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy { public SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext) throws YarnException { + // null checks and default-queue behavior + validate(appSubmissionContext); + Map activeSubclusters = getActiveSubclusters(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java index bc3a1f7..13d9140 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java @@ -36,6 +36,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy { public SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext) throws YarnException { + // null checks and default-queue behavior + validate(appSubmissionContext); + Map activeSubclusters = getActiveSubclusters(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java index b8f9cc3..d820449 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java @@ -48,11 +48,10 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy { } @Override - public void reinitialize( - FederationPolicyInitializationContext policyContext) + public void reinitialize(FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException { - FederationPolicyInitializationContextValidator - .validate(policyContext, this.getClass().getCanonicalName()); + FederationPolicyInitializationContextValidator.validate(policyContext, + this.getClass().getCanonicalName()); // note: this overrides AbstractRouterPolicy and ignores the weights @@ -73,6 +72,9 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy { public SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext) throws YarnException { + // null checks and default-queue behavior + validate(appSubmissionContext); + Map activeSubclusters = getActiveSubclusters(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java index ac75ae9..5727134 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java @@ -43,6 +43,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { public SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext) throws YarnException { + // null checks and default-queue behavior + validate(appSubmissionContext); + Map activeSubclusters = getActiveSubclusters(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java index ba897da..6bd8bf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.util.HashMap; @@ -35,8 +36,10 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.junit.Test; @@ -46,7 +49,7 @@ import org.junit.Test; public abstract class BaseFederationPoliciesTest { private ConfigurableFederationPolicy policy; - private WeightedPolicyInfo policyInfo; + private WeightedPolicyInfo policyInfo = mock(WeightedPolicyInfo.class); private Map activeSubclusters = new HashMap<>(); private FederationPolicyInitializationContext federationPolicyContext; private ApplicationSubmissionContext applicationSubmissionContext = @@ -103,7 +106,7 @@ public abstract class BaseFederationPoliciesTest { ((FederationRouterPolicy) localPolicy) .getHomeSubcluster(getApplicationSubmissionContext()); } else { - String[] hosts = new String[] {"host1", "host2" }; + String[] hosts = new String[] {"host1", "host2"}; List resourceRequests = FederationPoliciesTestUtil .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); ((FederationAMRMProxyPolicy) localPolicy) @@ -170,4 +173,14 @@ public abstract class BaseFederationPoliciesTest { this.homeSubCluster = homeSubCluster; } + public void setMockActiveSubclusters(int numSubclusters) { + for (int i = 1; i <= numSubclusters; i++) { + SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(sc.toId()); + getActiveSubclusters().put(sc.toId(), sci); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java deleted file mode 100644 index c609886..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.server.federation.policies; - -import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; -import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; -import org.junit.Assert; -import org.junit.Test; - -/** - * This class provides common test methods for testing {@code - * FederationPolicyManager}s. - */ -public abstract class BasePolicyManagerTest { - - - @SuppressWarnings("checkstyle:visibilitymodifier") - protected FederationPolicyManager wfp = null; - @SuppressWarnings("checkstyle:visibilitymodifier") - protected Class expectedPolicyManager; - @SuppressWarnings("checkstyle:visibilitymodifier") - protected Class expectedAMRMProxyPolicy; - @SuppressWarnings("checkstyle:visibilitymodifier") - protected Class expectedRouterPolicy; - - - @Test - public void testSerializeAndInstantiate() throws Exception { - serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, - expectedAMRMProxyPolicy, - expectedRouterPolicy); - } - - @Test(expected = FederationPolicyInitializationException.class) - public void testSerializeAndInstantiateBad1() throws Exception { - serializeAndDeserializePolicyManager(wfp, String.class, - expectedAMRMProxyPolicy, expectedRouterPolicy); - } - - @Test(expected = AssertionError.class) - public void testSerializeAndInstantiateBad2() throws Exception { - serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, - String.class, expectedRouterPolicy); - } - - @Test(expected = AssertionError.class) - public void testSerializeAndInstantiateBad3() throws Exception { - serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, - expectedAMRMProxyPolicy, String.class); - } - - protected static void serializeAndDeserializePolicyManager( - FederationPolicyManager wfp, Class policyManagerType, - Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception { - - // serializeConf it in a context - SubClusterPolicyConfiguration fpc = - wfp.serializeConf(); - fpc.setType(policyManagerType.getCanonicalName()); - FederationPolicyInitializationContext context = new - FederationPolicyInitializationContext(); - context.setSubClusterPolicyConfiguration(fpc); - context - .setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade()); - context.setFederationSubclusterResolver( - FederationPoliciesTestUtil.initResolver()); - context.setHomeSubcluster(SubClusterId.newInstance("homesubcluster")); - - // based on the "context" created instantiate new class and use it - Class c = Class.forName(wfp.getClass().getCanonicalName()); - FederationPolicyManager wfp2 = (FederationPolicyManager) c.newInstance(); - - FederationAMRMProxyPolicy federationAMRMProxyPolicy = - wfp2.getAMRMPolicy(context, null); - - //needed only for tests (getARMRMPolicy change the "type" in conf) - fpc.setType(wfp.getClass().getCanonicalName()); - - FederationRouterPolicy federationRouterPolicy = - wfp2.getRouterPolicy(context, null); - - Assert.assertEquals(federationAMRMProxyPolicy.getClass(), - expAMRMProxyPolicy); - - Assert.assertEquals(federationRouterPolicy.getClass(), - expRouterPolicy); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java index d906b92..611a486 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9110e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java deleted file mode 100644 index 5e5bc83..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.server.federation.policies; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; -import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; -import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Simple test of {@link PriorityBroadcastPolicyManager}. - */ -public class TestPriorityBroadcastPolicyManager extends BasePolicyManagerTest { - - private WeightedPolicyInfo policyInfo; - - @Before - public void setup() { - // configure a policy - - wfp = new PriorityBroadcastPolicyManager(); - wfp.setQueue("queue1"); - SubClusterId sc1 = SubClusterId.newInstance("sc1"); - SubClusterId sc2 = SubClusterId.newInstance("sc2"); - policyInfo = new WeightedPolicyInfo(); - - Map routerWeights = new HashMap<>(); - routerWeights.put(new SubClusterIdInfo(sc1), 0.2f); - routerWeights.put(new SubClusterIdInfo(sc2), 0.8f); - policyInfo.setRouterPolicyWeights(routerWeights); - - ((PriorityBroadcastPolicyManager) wfp).setWeightedPolicyInfo(policyInfo); - - // set expected params that the base test class will use for tests - expectedPolicyManager = PriorityBroadcastPolicyManager.class; - expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class; - expectedRouterPolicy = PriorityRouterPolicy.class; - } - - @Test - public void testPolicyInfoSetCorrectly() throws Exception { - serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, - expectedAMRMProxyPolicy, expectedRouterPolicy); - - // check the policyInfo propagates through ser/der correctly - Assert.assertEquals( - ((PriorityBroadcastPolicyManager) wfp).getWeightedPolicyInfo(), - policyInfo); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org