Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 11D66200BC8 for ; Tue, 8 Nov 2016 23:46:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1056A160B0A; Tue, 8 Nov 2016 22:46:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7108F160AD0 for ; Tue, 8 Nov 2016 23:46:21 +0100 (CET) Received: (qmail 91803 invoked by uid 500); 8 Nov 2016 22:46:12 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 84193 invoked by uid 99); 8 Nov 2016 22:46:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Nov 2016 22:46:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E97D0E0BB1; Tue, 8 Nov 2016 22:46:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: subru@apache.org To: common-commits@hadoop.apache.org Date: Tue, 08 Nov 2016 22:46:36 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [31/50] [abbrv] hadoop git commit: YARN-3672. Create Facade for Federation State and Policy Store. Contributed by Subru Krishnan archived-at: Tue, 08 Nov 2016 22:46:23 -0000 YARN-3672. Create Facade for Federation State and Policy Store. Contributed by Subru Krishnan Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa0aa288 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa0aa288 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa0aa288 Branch: refs/heads/YARN-2915 Commit: aa0aa2889c3336d252447d790b92ba49395b89d2 Parents: 0abc2e9 Author: Jian He Authored: Wed Aug 17 11:13:19 2016 +0800 Committer: Subru Krishnan Committed: Tue Nov 8 14:43:20 2016 -0800 ---------------------------------------------------------------------- hadoop-project/pom.xml | 13 + .../hadoop/yarn/conf/YarnConfiguration.java | 13 + .../yarn/conf/TestYarnConfigurationFields.java | 4 + .../src/main/resources/yarn-default.xml | 20 +- .../hadoop-yarn-server-common/pom.xml | 10 + .../utils/FederationStateStoreFacade.java | 532 +++++++++++++++++++ .../server/federation/utils/package-info.java | 17 + .../utils/FederationStateStoreTestUtil.java | 149 ++++++ .../utils/TestFederationStateStoreFacade.java | 148 ++++++ 9 files changed, 905 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa0aa288/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index ca567c5..c05c987 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -94,6 +94,9 @@ 2.0.0-M21 1.0.0-M33 + 1.0.0 + 3.0.3 + 1.8 @@ -1211,6 +1214,16 @@ kerb-simplekdc 1.0.0-RC2 + + javax.cache + cache-api + ${jcache.version} + + + org.ehcache + ehcache + ${ehcache.version} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa0aa288/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2339fc7..7c466b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2483,6 +2483,19 @@ public class YarnConfiguration extends Configuration { //////////////////////////////// public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation."; + + public static final String FEDERATION_STATESTORE_CLIENT_CLASS = + FEDERATION_PREFIX + "state-store.class"; + + public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS = + "org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore"; + + public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = + FEDERATION_PREFIX + "cache-ttl.secs"; + + // 5 minutes + public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; + public static final String FEDERATION_MACHINE_LIST = FEDERATION_PREFIX + "machine-list"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa0aa288/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index db9364a..c668757 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -92,6 +92,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR); + // Federation default configs to be ignored + configurationPropsToSkipCompare + .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS); + // Ignore blacklisting nodes for AM failures feature since it is still a // "work in progress" configurationPropsToSkipCompare.add(YarnConfiguration. http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa0aa288/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 389c0f3..9f2d9c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2591,8 +2591,8 @@ The arguments to pass to the Node label script. yarn.nodemanager.node-labels.provider.script.opts - + Machine list file to be loaded by the FederationSubCluster Resolver @@ -2601,6 +2601,24 @@ + + Store class name for federation state store + + yarn.federation.state-store.class + org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore + + + + + The time in seconds after which the federation state store local cache + will be refreshed periodically + + yarn.federation.cache-ttl.secs + 300 + + + + The interval that the yarn client library uses to poll the completion status of the asynchronous API of application client protocol. http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa0aa288/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index c16747a..b6fd0c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -109,6 +109,16 @@ org.fusesource.leveldbjni leveldbjni-all + + javax.cache + cache-api + ${jcache.version} + + + org.ehcache + ehcache + ${ehcache.version} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa0aa288/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java new file mode 100644 index 0000000..f1c8218 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -0,0 +1,532 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.Caching; +import javax.cache.configuration.CompleteConfiguration; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableConfiguration; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CacheLoader; +import javax.cache.integration.CacheLoaderException; +import javax.cache.spi.CachingProvider; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * + * The FederationStateStoreFacade is an utility wrapper that provides singleton + * access to the Federation state store. It abstracts out retries and in + * addition, it also implements the caching for various objects. + * + */ +public final class FederationStateStoreFacade { + private static final Logger LOG = + LoggerFactory.getLogger(FederationStateStoreFacade.class); + + private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters"; + private static final String GET_POLICIES_CONFIGURATIONS_CACHEID = + "getPoliciesConfigurations"; + + private static final FederationStateStoreFacade FACADE = + new FederationStateStoreFacade(); + + private FederationStateStore stateStore; + private int cacheTimeToLive; + private Configuration conf; + private Cache cache; + + private FederationStateStoreFacade() { + initializeFacadeInternal(new Configuration()); + } + + private void initializeFacadeInternal(Configuration config) { + this.conf = config; + try { + this.stateStore = (FederationStateStore) createRetryInstance(this.conf, + YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS, + FederationStateStore.class, createRetryPolicy(conf)); + this.stateStore.init(conf); + + initCache(); + + } catch (YarnException ex) { + LOG.error("Failed to initialize the FederationStateStoreFacade object", + ex); + throw new RuntimeException(ex); + } + } + + /** + * Delete and re-initialize the cache, to force it to use the given + * configuration. + * + * @param store the {@link FederationStateStore} instance to reinitialize with + * @param config the updated configuration to reinitialize with + */ + @VisibleForTesting + public synchronized void reinitialize(FederationStateStore store, + Configuration config) { + this.conf = config; + this.stateStore = store; + clearCache(); + initCache(); + } + + public static RetryPolicy createRetryPolicy(Configuration conf) { + // Retry settings for StateStore + RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetry( + conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, Integer.SIZE), + conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS), + TimeUnit.MILLISECONDS); + + return retryPolicy; + } + + private boolean isCachingEnabled() { + return (cacheTimeToLive > 0); + } + + private void initCache() { + // Picking the JCache provider from classpath, need to make sure there's + // no conflict or pick up a specific one in the future + cacheTimeToLive = + conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + if (isCachingEnabled()) { + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + this.cache = jcacheManager.getCache(this.getClass().getSimpleName()); + if (this.cache == null) { + LOG.info("Creating a JCache Manager with name " + + this.getClass().getSimpleName()); + Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive); + CompleteConfiguration configuration = + new MutableConfiguration().setStoreByValue(false) + .setReadThrough(true) + .setExpiryPolicyFactory( + new FactoryBuilder.SingletonFactory( + new CreatedExpiryPolicy(cacheExpiry))) + .setCacheLoaderFactory( + new FactoryBuilder.SingletonFactory>( + new CacheLoaderImpl())); + this.cache = jcacheManager.createCache(this.getClass().getSimpleName(), + configuration); + } + } + } + + private void clearCache() { + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + + jcacheManager.destroyCache(this.getClass().getSimpleName()); + this.cache = null; + } + + /** + * Returns the singleton instance of the FederationStateStoreFacade object. + * + * @return the singleton {@link FederationStateStoreFacade} instance + */ + public static FederationStateStoreFacade getInstance() { + return FACADE; + } + + /** + * Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}. + * + * @param subClusterId the identifier of the sub-cluster + * @return the sub cluster information + * @throws YarnException if the call to the state store is unsuccessful + */ + public SubClusterInfo getSubCluster(final SubClusterId subClusterId) + throws YarnException { + if (isCachingEnabled()) { + return getSubClusters(false).get(subClusterId); + } else { + return stateStore + .getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId)) + .getSubClusterInfo(); + } + } + + /** + * Updates the cache with the central {@link FederationStateStore} and returns + * the {@link SubClusterInfo} for the specified {@link SubClusterId}. + * + * @param subClusterId the identifier of the sub-cluster + * @param flushCache flag to indicate if the cache should be flushed or not + * @return the sub cluster information + * @throws YarnException if the call to the state store is unsuccessful + */ + public SubClusterInfo getSubCluster(final SubClusterId subClusterId, + final boolean flushCache) throws YarnException { + if (flushCache && isCachingEnabled()) { + LOG.info("Flushing subClusters from cache and rehydrating from store," + + " most likely on account of RM failover."); + cache.remove(buildGetSubClustersCacheRequest(false)); + } + return getSubCluster(subClusterId); + } + + /** + * Returns the {@link SubClusterInfo} of all active sub cluster(s). + * + * @param filterInactiveSubClusters whether to filter out inactive + * sub-clusters + * @return the information of all active sub cluster(s) + * @throws YarnException if the call to the state store is unsuccessful + */ + @SuppressWarnings("unchecked") + public Map getSubClusters( + final boolean filterInactiveSubClusters) throws YarnException { + try { + if (isCachingEnabled()) { + return (Map) cache + .get(buildGetSubClustersCacheRequest(filterInactiveSubClusters)); + } else { + return buildSubClusterInfoMap(stateStore.getSubClusters( + GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters))); + } + } catch (Throwable ex) { + throw new YarnException(ex); + } + } + + /** + * Returns the {@link SubClusterPolicyConfiguration} for the specified queue. + * + * @param queue the queue whose policy is required + * @return the corresponding configured policy + * @throws YarnException if the call to the state store is unsuccessful + */ + public SubClusterPolicyConfiguration getPolicyConfiguration( + final String queue) throws YarnException { + if (isCachingEnabled()) { + return getPoliciesConfigurations().get(queue); + } else { + return stateStore + .getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest.newInstance(queue)) + .getPolicyConfiguration(); + } + + } + + /** + * Get the policies that is represented as + * {@link SubClusterPolicyConfiguration} for all currently active queues in + * the system. + * + * @return the policies for all currently active queues in the system + * @throws YarnException if the call to the state store is unsuccessful + */ + @SuppressWarnings("unchecked") + public Map getPoliciesConfigurations() + throws YarnException { + try { + if (isCachingEnabled()) { + return (Map) cache + .get(buildGetPoliciesConfigurationsCacheRequest()); + } else { + return buildPolicyConfigMap(stateStore.getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest.newInstance())); + } + } catch (Throwable ex) { + throw new YarnException(ex); + } + } + + /** + * Adds the home {@link SubClusterId} for the specified {@link ApplicationId}. + * + * @param appHomeSubCluster the mapping of the application to it's home + * sub-cluster + * @throws YarnException if the call to the state store is unsuccessful + */ + public void addApplicationHomeSubCluster( + ApplicationHomeSubCluster appHomeSubCluster) throws YarnException { + stateStore.addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster)); + return; + } + + /** + * Updates the home {@link SubClusterId} for the specified + * {@link ApplicationId}. + * + * @param appHomeSubCluster the mapping of the application to it's home + * sub-cluster + * @throws YarnException if the call to the state store is unsuccessful + */ + public void updateApplicationHomeSubCluster( + ApplicationHomeSubCluster appHomeSubCluster) throws YarnException { + stateStore.updateApplicationHomeSubCluster( + UpdateApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster)); + return; + } + + /** + * Returns the home {@link SubClusterId} for the specified + * {@link ApplicationId}. + * + * @param appId the identifier of the application + * @return the home sub cluster identifier + * @throws YarnException if the call to the state store is unsuccessful + */ + public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) + throws YarnException { + GetApplicationHomeSubClusterResponse response = + stateStore.getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest.newInstance(appId)); + return response.getApplicationHomeSubCluster().getHomeSubCluster(); + } + + /** + * Helper method to create instances of Object using the class name defined in + * the configuration object. The instances creates {@link RetryProxy} using + * the specific {@link RetryPolicy}. + * + * @param conf the yarn configuration + * @param configuredClassName the configuration provider key + * @param defaultValue the default implementation for fallback + * @param type the class for which a retry proxy is required + * @param retryPolicy the policy for retrying method call failures + * @return a retry proxy for the specified interface + */ + @SuppressWarnings("unchecked") + public static Object createRetryInstance(Configuration conf, + String configuredClassName, String defaultValue, Class type, + RetryPolicy retryPolicy) { + + String className = conf.get(configuredClassName, defaultValue); + try { + Class clusterResolverClass = conf.getClassByName(className); + if (type.isAssignableFrom(clusterResolverClass)) { + return RetryProxy.create(type, + (T) ReflectionUtils.newInstance(clusterResolverClass, conf), + retryPolicy); + } else { + throw new YarnRuntimeException( + "Class: " + className + " not instance of " + type.getSimpleName()); + } + } catch (Exception e) { + throw new YarnRuntimeException("Could not instantiate : " + className, e); + } + } + + private Map buildSubClusterInfoMap( + final GetSubClustersInfoResponse response) { + List subClusters = response.getSubClusters(); + Map subClustersMap = + new HashMap<>(subClusters.size()); + for (SubClusterInfo subCluster : subClusters) { + subClustersMap.put(subCluster.getSubClusterId(), subCluster); + } + return subClustersMap; + } + + private Object buildGetSubClustersCacheRequest( + final boolean filterInactiveSubClusters) { + final String cacheKey = buildCacheKey(getClass().getSimpleName(), + GET_SUBCLUSTERS_CACHEID, null); + CacheRequest> cacheRequest = + new CacheRequest>(cacheKey, + new Func>() { + @Override + public Map invoke(String key) + throws Exception { + GetSubClustersInfoResponse subClusters = + stateStore.getSubClusters(GetSubClustersInfoRequest + .newInstance(filterInactiveSubClusters)); + return buildSubClusterInfoMap(subClusters); + } + }); + return cacheRequest; + } + + private Map buildPolicyConfigMap( + GetSubClusterPoliciesConfigurationsResponse response) { + List policyConfigs = + response.getPoliciesConfigs(); + Map queuePolicyConfigs = + new HashMap<>(); + for (SubClusterPolicyConfiguration policyConfig : policyConfigs) { + queuePolicyConfigs.put(policyConfig.getQueue(), policyConfig); + } + return queuePolicyConfigs; + } + + private Object buildGetPoliciesConfigurationsCacheRequest() { + final String cacheKey = buildCacheKey(getClass().getSimpleName(), + GET_POLICIES_CONFIGURATIONS_CACHEID, null); + CacheRequest> cacheRequest = + new CacheRequest>( + cacheKey, + new Func>() { + @Override + public Map invoke( + String key) throws Exception { + GetSubClusterPoliciesConfigurationsResponse policyConfigs = + stateStore.getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest + .newInstance()); + return buildPolicyConfigMap(policyConfigs); + } + }); + return cacheRequest; + } + + protected String buildCacheKey(String typeName, String methodName, + String argName) { + StringBuilder buffer = new StringBuilder(); + buffer.append(typeName).append("."); + buffer.append(methodName); + if (argName != null) { + buffer.append("::"); + buffer.append(argName); + } + return buffer.toString(); + } + + /** + * Internal class that implements the CacheLoader interface that can be + * plugged into the CacheManager to load objects into the cache for specified + * keys. + */ + private static class CacheLoaderImpl implements CacheLoader { + @SuppressWarnings("unchecked") + @Override + public V load(K key) throws CacheLoaderException { + try { + CacheRequest query = (CacheRequest) key; + assert query != null; + return query.getValue(); + } catch (Throwable ex) { + throw new CacheLoaderException(ex); + } + } + + @Override + public Map loadAll(Iterable keys) + throws CacheLoaderException { + // The FACADE does not use the Cache's getAll API. Hence this is not + // required to be implemented + throw new NotImplementedException(); + } + } + + /** + * Internal class that encapsulates the cache key and a function that returns + * the value for the specified key. + */ + private static class CacheRequest { + private K key; + private Func func; + + public CacheRequest(K key, Func func) { + this.key = key; + this.func = func; + } + + public V getValue() throws Exception { + return func.invoke(key); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((key == null) ? 0 : key.hashCode()); + return result; + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + CacheRequest other = (CacheRequest) obj; + if (key == null) { + if (other.key != null) { + return false; + } + } else if (!key.equals(other.key)) { + return false; + } + + return true; + } + } + + /** + * Encapsulates a method that has one parameter and returns a value of the + * type specified by the TResult parameter. + */ + protected interface Func { + TResult invoke(T input) throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa0aa288/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java new file mode 100644 index 0000000..39a46ec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.utils; http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa0aa288/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java new file mode 100644 index 0000000..c179521 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.util.MonotonicClock; + +/** + * Utility class for FederationStateStore unit tests. + */ +public class FederationStateStoreTestUtil { + + private static final MonotonicClock CLOCK = new MonotonicClock(); + + public static final String SC_PREFIX = "SC-"; + public static final String Q_PREFIX = "queue-"; + public static final String POLICY_PREFIX = "policy-"; + + private FederationStateStore stateStore; + + public FederationStateStoreTestUtil(FederationStateStore stateStore) { + this.stateStore = stateStore; + } + + private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) { + + String amRMAddress = "1.2.3.4:1"; + String clientRMAddress = "1.2.3.4:2"; + String rmAdminAddress = "1.2.3.4:3"; + String webAppAddress = "1.2.3.4:4"; + + return SubClusterInfo.newInstance(subClusterId, amRMAddress, + clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW, + CLOCK.getTime(), "capability"); + } + + private void registerSubCluster(SubClusterId subClusterId) + throws YarnException { + + SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo)); + } + + public void registerSubClusters(int numSubClusters) throws YarnException { + + for (int i = 0; i < numSubClusters; i++) { + registerSubCluster(SubClusterId.newInstance(SC_PREFIX + i)); + } + } + + private void addApplicationHomeSC(ApplicationId appId, + SubClusterId subClusterId) throws YarnException { + ApplicationHomeSubCluster ahsc = + ApplicationHomeSubCluster.newInstance(appId, subClusterId); + AddApplicationHomeSubClusterRequest request = + AddApplicationHomeSubClusterRequest.newInstance(ahsc); + stateStore.addApplicationHomeSubCluster(request); + } + + public void addAppsHomeSC(long clusterTs, int numApps) throws YarnException { + for (int i = 0; i < numApps; i++) { + addApplicationHomeSC(ApplicationId.newInstance(clusterTs, i), + SubClusterId.newInstance(SC_PREFIX + i)); + } + } + + private SubClusterPolicyConfiguration createSCPolicyConf(String queueName, + String policyType) { + return SubClusterPolicyConfiguration.newInstance(queueName, policyType, + ByteBuffer.allocate(1)); + } + + private void setPolicyConf(String queue, String policyType) + throws YarnException { + SetSubClusterPolicyConfigurationRequest request = + SetSubClusterPolicyConfigurationRequest + .newInstance(createSCPolicyConf(queue, policyType)); + stateStore.setPolicyConfiguration(request); + } + + public void addPolicyConfigs(int numQueues) throws YarnException { + + for (int i = 0; i < numQueues; i++) { + setPolicyConf(Q_PREFIX + i, POLICY_PREFIX + i); + } + } + + public SubClusterInfo querySubClusterInfo(SubClusterId subClusterId) + throws YarnException { + GetSubClusterInfoRequest request = + GetSubClusterInfoRequest.newInstance(subClusterId); + return stateStore.getSubCluster(request).getSubClusterInfo(); + } + + public SubClusterId queryApplicationHomeSC(ApplicationId appId) + throws YarnException { + GetApplicationHomeSubClusterRequest request = + GetApplicationHomeSubClusterRequest.newInstance(appId); + + GetApplicationHomeSubClusterResponse response = + stateStore.getApplicationHomeSubCluster(request); + + return response.getApplicationHomeSubCluster().getHomeSubCluster(); + } + + public SubClusterPolicyConfiguration queryPolicyConfiguration(String queue) + throws YarnException { + GetSubClusterPolicyConfigurationRequest request = + GetSubClusterPolicyConfigurationRequest.newInstance(queue); + + GetSubClusterPolicyConfigurationResponse result = + stateStore.getPolicyConfiguration(request); + return result.getPolicyConfiguration(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa0aa288/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java new file mode 100644 index 0000000..53f4f84 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Unit tests for FederationStateStoreFacade. + */ +@RunWith(Parameterized.class) +public class TestFederationStateStoreFacade { + + @Parameters + public static Collection getParameters() { + return Arrays + .asList(new Boolean[][] {{Boolean.FALSE }, {Boolean.TRUE } }); + } + + private final long clusterTs = System.currentTimeMillis(); + private final int numSubClusters = 3; + private final int numApps = 5; + private final int numQueues = 2; + + private Configuration conf; + private FederationStateStore stateStore; + private FederationStateStoreTestUtil stateStoreTestUtil; + private FederationStateStoreFacade facade = + FederationStateStoreFacade.getInstance(); + + public TestFederationStateStoreFacade(Boolean isCachingEnabled) { + conf = new Configuration(); + if (!(isCachingEnabled.booleanValue())) { + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + } + } + + @Before + public void setUp() throws IOException, YarnException { + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + facade.reinitialize(stateStore, conf); + // hydrate the store + stateStoreTestUtil = new FederationStateStoreTestUtil(stateStore); + stateStoreTestUtil.registerSubClusters(numSubClusters); + stateStoreTestUtil.addAppsHomeSC(clusterTs, numApps); + stateStoreTestUtil.addPolicyConfigs(numQueues); + } + + @After + public void tearDown() throws Exception { + stateStore.close(); + stateStore = null; + } + + @Test + public void testGetSubCluster() throws YarnException { + for (int i = 0; i < numSubClusters; i++) { + SubClusterId subClusterId = + SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i); + Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId), + facade.getSubCluster(subClusterId)); + } + } + + @Test + public void testGetSubClusterFlushCache() throws YarnException { + for (int i = 0; i < numSubClusters; i++) { + SubClusterId subClusterId = + SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i); + Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId), + facade.getSubCluster(subClusterId, true)); + } + } + + @Test + public void testGetSubClusters() throws YarnException { + Map subClusters = + facade.getSubClusters(false); + for (SubClusterId subClusterId : subClusters.keySet()) { + Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId), + subClusters.get(subClusterId)); + } + } + + @Test + public void testGetPolicyConfiguration() throws YarnException { + for (int i = 0; i < numQueues; i++) { + String queue = FederationStateStoreTestUtil.Q_PREFIX + i; + Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue), + facade.getPolicyConfiguration(queue)); + } + } + + @Test + public void testGetPoliciesConfigurations() throws YarnException { + Map queuePolicies = + facade.getPoliciesConfigurations(); + for (String queue : queuePolicies.keySet()) { + Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue), + queuePolicies.get(queue)); + } + } + + @Test + public void testGetHomeSubClusterForApp() throws YarnException { + for (int i = 0; i < numApps; i++) { + ApplicationId appId = ApplicationId.newInstance(clusterTs, i); + Assert.assertEquals(stateStoreTestUtil.queryApplicationHomeSC(appId), + facade.getApplicationHomeSubCluster(appId)); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org