Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C032E200AEF for ; Sat, 28 May 2016 20:59:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BEBFF160939; Sat, 28 May 2016 18:59:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 967CF160A34 for ; Sat, 28 May 2016 20:59:31 +0200 (CEST) Received: (qmail 46884 invoked by uid 500); 28 May 2016 18:59:30 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 46783 invoked by uid 99); 28 May 2016 18:59:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 28 May 2016 18:59:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 840EFE0158; Sat, 28 May 2016 18:59:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jonathanhurley@apache.org To: commits@ambari.apache.org Date: Sat, 28 May 2016 18:59:31 -0000 Message-Id: <3a1c0582c3504d96a6111fd2b3d71464@git.apache.org> In-Reply-To: <7b519541337a4ef48add8beb8a065043@git.apache.org> References: <7b519541337a4ef48add8beb8a065043@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] ambari git commit: AMBARI-16913 - Web Client Requests Handled By Jetty Should Not Be Blocked By JMX Property Providers (jonathanhurley) archived-at: Sat, 28 May 2016 18:59:33 -0000 AMBARI-16913 - Web Client Requests Handled By Jetty Should Not Be Blocked By JMX Property Providers (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/384a4bea Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/384a4bea Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/384a4bea Branch: refs/heads/branch-2.4 Commit: 384a4beaf50ad1c3a779f48066279cb650b5eb0c Parents: ebdbe6f Author: Jonathan Hurley Authored: Fri May 27 13:26:30 2016 -0400 Committer: Jonathan Hurley Committed: Sat May 28 14:56:05 2016 -0400 ---------------------------------------------------------------------- .../org/apache/ambari/server/AmbariService.java | 5 +- .../server/configuration/Configuration.java | 154 +++++- .../controller/AmbariManagementController.java | 9 + .../AmbariManagementControllerImpl.java | 9 + .../ambari/server/controller/AmbariServer.java | 16 +- .../server/controller/ControllerModule.java | 13 +- .../internal/AbstractProviderModule.java | 23 +- .../internal/StackDefinedPropertyProvider.java | 60 ++- .../controller/jmx/JMXPropertyProvider.java | 166 +++--- .../metrics/MetricPropertyProviderFactory.java | 103 ++++ .../metrics/RestMetricsPropertyProvider.java | 160 +++--- .../ThreadPoolEnabledPropertyProvider.java | 131 +++-- ...eredThreadPoolExecutorCompletionService.java | 58 +- .../utilities/ScalingThreadPoolExecutor.java | 36 +- .../state/services/MetricsRetrievalService.java | 539 +++++++++++++++++++ .../server/configuration/ConfigurationTest.java | 72 +++ .../StackDefinedPropertyProviderTest.java | 48 +- .../metrics/JMXPropertyProviderTest.java | 64 ++- .../RestMetricsPropertyProviderTest.java | 40 +- ...ThreadPoolExecutorCompletionServiceTest.java | 15 +- .../utils/SynchronousThreadPoolExecutor.java | 85 +++ 21 files changed, 1493 insertions(+), 313 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/AmbariService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/AmbariService.java b/ambari-server/src/main/java/org/apache/ambari/server/AmbariService.java index 186e272..86ad1cf 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/AmbariService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/AmbariService.java @@ -22,15 +22,14 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.ServiceManager; import com.google.inject.ScopeAnnotation; import com.google.inject.Singleton; /** * The {@link AmbariService} annotation is used to register a class that - * implements Guava's {@link AbstractScheduledService} with the - * {@link ServiceManager}. + * implements Guava's {@link Service} with the {@link ServiceManager}. *

* Classes with this annotation are bound as singletons and automatically * injected with their members. There is not need to use {@link Singleton} or http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index 0c2fbba..d104cb6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -34,6 +34,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.ambari.annotations.Experimental; import org.apache.ambari.annotations.ExperimentalFeature; @@ -48,6 +50,7 @@ import org.apache.ambari.server.security.authorization.LdapServerProperties; import org.apache.ambari.server.security.authorization.jwt.JwtAuthenticationProperties; import org.apache.ambari.server.security.encryption.CertificateUtils; import org.apache.ambari.server.security.encryption.CredentialProvider; +import org.apache.ambari.server.state.services.MetricsRetrievalService; import org.apache.ambari.server.state.stack.OsFamily; import org.apache.ambari.server.utils.AmbariPath; import org.apache.ambari.server.utils.Parallel; @@ -566,10 +569,20 @@ public class Configuration { private static final int VIEW_REQUEST_THREADPOOL_TIMEOUT_DEFAULT = 2000; + /** + * Threadpool sizing based on the number of available processors multiplied by 2. + */ + public static final int PROCESSOR_BASED_THREADPOOL_CORE_SIZE_DEFAULT = 2 * Runtime.getRuntime().availableProcessors(); + + /** + * Threadpool sizing based on the number of available processors multiplied by 4. + */ + public static final int PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT = 4 * Runtime.getRuntime().availableProcessors(); + public static final String PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_KEY = "server.property-provider.threadpool.size.max"; - public static final int PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_DEFAULT = 4 * Runtime.getRuntime().availableProcessors(); public static final String PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_KEY = "server.property-provider.threadpool.size.core"; - public static final int PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_DEFAULT = 2 * Runtime.getRuntime().availableProcessors(); + public static final String PROPERTY_PROVIDER_THREADPOOL_WORKER_QUEUE_SIZE = "server.property-provider.threadpool.worker.size"; + public static final String PROPERTY_PROVIDER_THREADPOOL_COMPLETION_TIMEOUT = "server.property-provider.threadpool.completion.timeout"; private static final String SERVER_HTTP_SESSION_INACTIVE_TIMEOUT = "server.http.session.inactive_timeout"; @@ -720,6 +733,39 @@ public class Configuration { public static final String ALERTS_SNMP_DISPATCH_UDP_PORT = "alerts.snmp.dispatcher.udp.port"; + /** + * The amount of time that the {@link MetricsRetrievalService} will cache + * retrieved metric data. + */ + public static final String METRIC_RETRIEVAL_SERVICE_CACHE_TIMEOUT = "metrics.retrieval-service.cache.timeout"; + + /** + * The priorty of the {@link Thread}s used by the + * {@link MetricsRetrievalService}. This is a value in between + * {@link Thread#MIN_PRIORITY} and {@link Thread#MAX_PRIORITY}. + */ + public static final String METRIC_RETRIEVAL_SERVICE_THREAD_PRIORITY = "server.metrics.retrieval-service.thread.priority"; + + /** + * The maximum size of the threadpool for the {@link MetricsRetrievalService}. + * This value is only applicable if the + * {@link #METRIC_RETRIEVAL_SERVICE_THREADPOOL_WORKER_QUEUE_SIZE} is small + * enough to trigger the {@link ThreadPoolExecutor} to create new threads. + */ + public static final String METRIC_RETRIEVAL_SERVICE_THREADPOOL_MAX_SIZE = "server.metrics.retrieval-service.threadpool.size.max"; + + /** + * The core size of the threadpool for the {@link MetricsRetrievalService}. + */ + public static final String METRIC_RETRIEVAL_SERVICE_THREADPOOL_CORE_SIZE = "server.metrics.retrieval-service.threadpool.size.core"; + + /** + * The size of the worker queue for the {@link MetricsRetrievalService}. The + * larger this queue is, the less likely it will be to create more threads + * beyond the core size. + */ + public static final String METRIC_RETRIEVAL_SERVICE_THREADPOOL_WORKER_QUEUE_SIZE = "server.metrics.retrieval-service.threadpool.worker.size"; + private static final Logger LOG = LoggerFactory.getLogger( Configuration.class); @@ -2465,7 +2511,7 @@ public class Configuration { */ public int getPropertyProvidersThreadPoolCoreSize() { return Integer.parseInt(properties.getProperty(PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_KEY, - String.valueOf(PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_DEFAULT))); + String.valueOf(PROCESSOR_BASED_THREADPOOL_CORE_SIZE_DEFAULT))); } /** @@ -2475,7 +2521,31 @@ public class Configuration { */ public int getPropertyProvidersThreadPoolMaxSize() { return Integer.parseInt(properties.getProperty(PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_KEY, - String.valueOf(PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_DEFAULT))); + String.valueOf(PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT))); + } + + /** + * Get property-providers' worker queue size. This will return + * {@link Integer#MAX_VALUE} if not specified which will allow an unbounded + * queue and essentially a fixed core threadpool size. + * + * @return the property-providers' worker queue size. + */ + public int getPropertyProvidersWorkerQueueSize() { + return Integer.parseInt(properties.getProperty(PROPERTY_PROVIDER_THREADPOOL_WORKER_QUEUE_SIZE, + String.valueOf(Integer.MAX_VALUE))); + } + + /** + * Get property-providers' timeout value in milliseconds for waiting on the + * completion of submitted {@link Callable}s. This will return {@value 5000} + * if not specified. + * + * @return the property-providers' completion srevice timeout, in millis. + */ + public long getPropertyProvidersCompletionServiceTimeout() { + return Long.parseLong(properties.getProperty(PROPERTY_PROVIDER_THREADPOOL_COMPLETION_TIMEOUT, + String.valueOf(5000))); } /** @@ -3037,4 +3107,80 @@ public class Configuration { public boolean isLdapAlternateUserSearchEnabled() { return Boolean.parseBoolean(properties.getProperty(LDAP_ALT_USER_SEARCH_ENABLED_KEY, LDAP_ALT_USER_SEARCH_ENABLED_DEFAULT)); } + + /** + * Gets the number of minutes that data cached by the + * {@link MetricsRetrievalService} is kept. The longer this value is, the + * older the data will be when a user first logs in. After that first login, + * data will be updated by the {@link MetricsRetrievalService} as long as + * incoming REST requests are made. + *

+ * It is recommended that this value be longer rather than shorter since the + * performance benefit of the cache greatly outweighs the data loaded after + * first login. + * + * @return the number of minutes, defaulting to 30 if not specified. + */ + public int getMetricsServiceCacheTimeout() { + return Integer.parseInt(properties.getProperty(METRIC_RETRIEVAL_SERVICE_CACHE_TIMEOUT, "30")); + } + + /** + * Gets the priority of the {@link Thread}s used by the + * {@link MetricsRetrievalService}. This will be a value within the range of + * {@link Thread#MIN_PRIORITY} and {@link Thread#MAX_PRIORITY}. + * + * @return the thread proprity. + */ + public int getMetricsServiceThreadPriority() { + int priority = Integer.parseInt(properties.getProperty(METRIC_RETRIEVAL_SERVICE_THREAD_PRIORITY, + String.valueOf(Thread.NORM_PRIORITY))); + + if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { + priority = Thread.NORM_PRIORITY; + } + + return priority; + } + + /** + * Gets the core pool size used for the {@link MetricsRetrievalService}. + * + * @return the core pool size or + * {@value #PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT} if not + * specified. + */ + public int getMetricsServiceThreadPoolCoreSize() { + return Integer.parseInt(properties.getProperty(METRIC_RETRIEVAL_SERVICE_THREADPOOL_CORE_SIZE, + String.valueOf(PROCESSOR_BASED_THREADPOOL_CORE_SIZE_DEFAULT))); + } + + /** + * Gets the max pool size used for the {@link MetricsRetrievalService}. + * Threads will only be increased up to this value of the worker queue is + * exhauseted and rejects the new task. + * + * @return the max pool size, or + * {@value PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT} if not + * specified. + * @see #getMetricsServiceWorkerQueueSize() + */ + public int getMetricsServiceThreadPoolMaxSize() { + return Integer.parseInt(properties.getProperty(METRIC_RETRIEVAL_SERVICE_THREADPOOL_MAX_SIZE, + String.valueOf(PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT))); + } + + /** + * Gets the queue size of the worker queue for the + * {@link MetricsRetrievalService}. + * + * @return the worker queue size, or {@code 10 *} + * {@link #getMetricsServiceThreadPoolMaxSize()} if not specified. + */ + public int getMetricsServiceWorkerQueueSize() { + return Integer.parseInt( + properties.getProperty(METRIC_RETRIEVAL_SERVICE_THREADPOOL_WORKER_QUEUE_SIZE, + String.valueOf(10 * getMetricsServiceThreadPoolMaxSize()))); + } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java index d6b9d0e..10a1f8c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java @@ -29,6 +29,7 @@ import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.agent.ExecutionCommand; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.controller.internal.RequestStageContainer; +import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory; import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider; import org.apache.ambari.server.metadata.RoleCommandOrder; import org.apache.ambari.server.scheduler.ExecutionScheduleManager; @@ -797,6 +798,14 @@ public interface AmbariManagementController { TimelineMetricCacheProvider getTimelineMetricCacheProvider(); /** + * Gets the {@link MetricPropertyProviderFactory} that was injected into this + * class. This is a terrible pattern. + * + * @return the injected {@link MetricPropertyProviderFactory} + */ + MetricPropertyProviderFactory getMetricPropertyProviderFactory(); + + /** * Returns KerberosHelper instance * @return */ http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java index f4a615c..79a4e90 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java @@ -93,6 +93,7 @@ import org.apache.ambari.server.controller.internal.RequestStageContainer; import org.apache.ambari.server.controller.internal.URLStreamProvider; import org.apache.ambari.server.controller.internal.WidgetLayoutResourceProvider; import org.apache.ambari.server.controller.internal.WidgetResourceProvider; +import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory; import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.customactions.ActionDefinition; @@ -4715,6 +4716,14 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle return injector.getInstance(TimelineMetricCacheProvider.class); } + /** + * {@inheritDoc} + */ + @Override + public MetricPropertyProviderFactory getMetricPropertyProviderFactory() { + return injector.getInstance(MetricPropertyProviderFactory.class); + } + @Override public KerberosHelper getKerberosHelper() { return kerberosHelper; http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java index 99a6cab..d17a451 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java @@ -166,6 +166,16 @@ import com.sun.jersey.spi.container.servlet.ServletContainer; public class AmbariServer { private static Logger LOG = LoggerFactory.getLogger(AmbariServer.class); + /** + * The thread name prefix for threads handling agent requests. + */ + private static final String AGENT_THREAD_POOL_NAME = "ambari-agent-thread"; + + /** + * The thread name prefix for threads handling REST API requests. + */ + private static final String CLIENT_THREAD_POOL_NAME = "ambari-client-thread"; + // Set velocity logger protected static final String VELOCITY_LOG_CATEGORY = "VelocityLogger"; @@ -458,7 +468,7 @@ public class AmbariServer { // Agent Jetty thread pool configureJettyThreadPool(serverForAgent, sslConnectorOneWay.getAcceptors(), - "qtp-ambari-agent", configs.getAgentThreadPoolSize()); + AGENT_THREAD_POOL_NAME, configs.getAgentThreadPoolSize()); serverForAgent.addConnector(sslConnectorOneWay); serverForAgent.addConnector(sslConnectorTwoWay); @@ -468,7 +478,7 @@ public class AmbariServer { agentConnector.setIdleTimeout(configs.getConnectionMaxIdleTime()); // Agent Jetty thread pool - configureJettyThreadPool(serverForAgent, agentConnector.getAcceptors(), "qtp-ambari-agent", + configureJettyThreadPool(serverForAgent, agentConnector.getAcceptors(), AGENT_THREAD_POOL_NAME, configs.getAgentThreadPoolSize()); serverForAgent.addConnector(agentConnector); @@ -575,7 +585,7 @@ public class AmbariServer { } // Client Jetty thread pool - configureJettyThreadPool(server, apiConnector.getAcceptors(), "qtp-ambari-client", configs.getClientThreadPoolSize()); + configureJettyThreadPool(server, apiConnector.getAcceptors(), CLIENT_THREAD_POOL_NAME, configs.getClientThreadPoolSize()); server.addConnector(apiConnector); server.setStopAtShutdown(true); http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java index 617553b..e0bda13 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java @@ -70,6 +70,7 @@ import org.apache.ambari.server.controller.internal.MemberResourceProvider; import org.apache.ambari.server.controller.internal.RepositoryVersionResourceProvider; import org.apache.ambari.server.controller.internal.ServiceResourceProvider; import org.apache.ambari.server.controller.internal.UpgradeResourceProvider; +import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory; import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory; import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider; import org.apache.ambari.server.controller.spi.ResourceProvider; @@ -143,7 +144,6 @@ import org.springframework.security.web.AuthenticationEntryPoint; import org.springframework.util.ClassUtils; import org.springframework.web.filter.DelegatingFilterProxy; -import com.google.common.util.concurrent.AbstractScheduledService; import com.google.common.util.concurrent.ServiceManager; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -456,6 +456,7 @@ public class ControllerModule extends AbstractModule { install(new FactoryModuleBuilder().build(RequestFactory.class)); install(new FactoryModuleBuilder().build(StackManagerFactory.class)); install(new FactoryModuleBuilder().build(ExecutionCommandWrapperFactory.class)); + install(new FactoryModuleBuilder().build(MetricPropertyProviderFactory.class)); bind(HostRoleCommandFactory.class).to(HostRoleCommandFactoryImpl.class); bind(SecurityHelper.class).toInstance(SecurityHelperImpl.getInstance()); @@ -528,9 +529,9 @@ public class ControllerModule extends AbstractModule { // Ambari services are registered with Guava if (null != clazz.getAnnotation(AmbariService.class)) { // safety check to ensure it's actually a Guava service - if (!AbstractScheduledService.class.isAssignableFrom(clazz)) { + if (!com.google.common.util.concurrent.Service.class.isAssignableFrom(clazz)) { String message = MessageFormat.format( - "Unable to register service {0} because it is not an AbstractScheduledService", + "Unable to register service {0} because it is not a Service which can be scheduled", clazz); LOG.warn(message); @@ -538,10 +539,10 @@ public class ControllerModule extends AbstractModule { } // instantiate the service, register as singleton via toInstance() - AbstractScheduledService service = null; + com.google.common.util.concurrent.Service service = null; try { - service = (AbstractScheduledService) clazz.newInstance(); - bind((Class) clazz).toInstance(service); + service = (com.google.common.util.concurrent.Service) clazz.newInstance(); + bind((Class) clazz).toInstance(service); services.add(service); LOG.debug("Registering service {} ", clazz); } catch (Exception exception) { http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java index 255accc..92b16aa 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java @@ -43,9 +43,9 @@ import org.apache.ambari.server.controller.HostResponse; import org.apache.ambari.server.controller.ServiceComponentHostRequest; import org.apache.ambari.server.controller.ServiceComponentHostResponse; import org.apache.ambari.server.controller.jmx.JMXHostProvider; -import org.apache.ambari.server.controller.jmx.JMXPropertyProvider; import org.apache.ambari.server.controller.logging.LoggingSearchPropertyProvider; import org.apache.ambari.server.controller.metrics.MetricHostProvider; +import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory; import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider; import org.apache.ambari.server.controller.metrics.MetricsReportPropertyProvider; import org.apache.ambari.server.controller.metrics.MetricsServiceProvider; @@ -167,11 +167,11 @@ public abstract class AbstractProviderModule implements ProviderModule, initPropMap = new HashMap(); initPropMap.put("RESOURCEMANAGER", new String[]{"yarn.http.policy"}); jmxDesiredProperties.put("RESOURCEMANAGER", initPropMap); - + initPropMap = new HashMap(); initPropMap.put("HISTORYSERVER", new String[]{"mapreduce.jobhistory.http.policy"}); jmxDesiredProperties.put("HISTORYSERVER", initPropMap); - + initPropMap = new HashMap(); initPropMap.put("client", new String[]{"dfs.namenode.rpc-address"}); initPropMap.put("datanode", new String[]{"dfs.namenode.servicerpc-address"}); @@ -208,6 +208,13 @@ public abstract class AbstractProviderModule implements ProviderModule, TimelineMetricCacheProvider metricCacheProvider; /** + * A factory used to retrieve Guice-injected instances of a metric + * {@link PropertyProvider}. + */ + @Inject + private MetricPropertyProviderFactory metricPropertyProviderFactory; + + /** * The map of host components. */ private Map> clusterHostComponentMap; @@ -243,9 +250,14 @@ public abstract class AbstractProviderModule implements ProviderModule, if (managementController == null) { managementController = AmbariServer.getController(); } + if (metricCacheProvider == null && managementController != null) { metricCacheProvider = managementController.getTimelineMetricCacheProvider(); } + + if (metricPropertyProviderFactory == null && managementController != null) { + metricPropertyProviderFactory = managementController.getMetricPropertyProviderFactory(); + } } @@ -1085,7 +1097,8 @@ public abstract class AbstractProviderModule implements ProviderModule, String componentNamePropertyId, String statePropertyId) { - return new JMXPropertyProvider(PropertyHelper.getJMXPropertyIds(type), streamProvider, + return metricPropertyProviderFactory.createJMXPropertyProvider( + PropertyHelper.getJMXPropertyIds(type), streamProvider, jmxHostProvider, metricsHostProvider, clusterNamePropertyId, hostNamePropertyId, componentNamePropertyId, statePropertyId); } @@ -1202,7 +1215,7 @@ public abstract class AbstractProviderModule implements ProviderModule, } private String getJMXProtocolString(String value) { - if (value.equals(PROPERTY_HDFS_HTTP_POLICY_VALUE_HTTPS_ONLY)) { + if (PROPERTY_HDFS_HTTP_POLICY_VALUE_HTTPS_ONLY.equals(value)) { return "https"; } else { return "http"; http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java index 6c40d14..f5040e5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java @@ -17,15 +17,25 @@ */ package org.apache.ambari.server.controller.internal; -import com.google.inject.Inject; -import com.google.inject.Injector; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.ComponentSSLConfiguration; import org.apache.ambari.server.controller.jmx.JMXHostProvider; import org.apache.ambari.server.controller.jmx.JMXPropertyProvider; import org.apache.ambari.server.controller.metrics.MetricHostProvider; +import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory; import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider; import org.apache.ambari.server.controller.metrics.MetricsServiceProvider; +import org.apache.ambari.server.controller.metrics.RestMetricsPropertyProvider; import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider; import org.apache.ambari.server.controller.spi.Predicate; import org.apache.ambari.server.controller.spi.PropertyProvider; @@ -42,15 +52,8 @@ import org.apache.ambari.server.state.stack.MetricDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; +import com.google.inject.Inject; +import com.google.inject.Injector; /** * This class analyzes a service's metrics to determine if additional @@ -62,11 +65,20 @@ public class StackDefinedPropertyProvider implements PropertyProvider { @Inject private static Clusters clusters = null; + @Inject private static AmbariMetaInfo metaInfo = null; + @Inject private static Injector injector = null; + /** + * A factory used to retrieve Guice-injected instances of a metric + * {@link PropertyProvider}. + */ + @Inject + private static MetricPropertyProviderFactory metricPropertyProviderFactory; + private Resource.Type type = null; private String clusterNamePropertyId = null; private String hostNamePropertyId = null; @@ -92,6 +104,7 @@ public class StackDefinedPropertyProvider implements PropertyProvider { public static void init(Injector injector) { clusters = injector.getInstance(Clusters.class); metaInfo = injector.getInstance(AmbariMetaInfo.class); + metricPropertyProviderFactory = injector.getInstance(MetricPropertyProviderFactory.class); StackDefinedPropertyProvider.injector = injector; } @@ -108,12 +121,14 @@ public class StackDefinedPropertyProvider implements PropertyProvider { PropertyProvider defaultGangliaPropertyProvider) { this.metricHostProvider = metricHostProvider; - this.metricsServiceProvider = serviceProvider; + metricsServiceProvider = serviceProvider; - if (null == clusterPropertyId) + if (null == clusterPropertyId) { throw new NullPointerException("Cluster name property id cannot be null"); - if (null == componentPropertyId) + } + if (null == componentPropertyId) { throw new NullPointerException("Component name property id cannot be null"); + } this.type = type; @@ -152,8 +167,9 @@ public class StackDefinedPropertyProvider implements PropertyProvider { List defs = metaInfo.getMetrics( stack.getStackName(), stack.getStackVersion(), svc, componentName, type.name()); - if (null == defs || 0 == defs.size()) + if (null == defs || 0 == defs.size()) { continue; + } for (MetricDefinition m : defs) { if (m.getType().equals("ganglia")) { @@ -192,7 +208,8 @@ public class StackDefinedPropertyProvider implements PropertyProvider { } if (jmxMap.size() > 0) { - JMXPropertyProvider jpp = new JMXPropertyProvider(jmxMap, streamProvider, + JMXPropertyProvider jpp = metricPropertyProviderFactory.createJMXPropertyProvider(jmxMap, + streamProvider, jmxHostProvider, metricHostProvider, clusterNamePropertyId, hostNamePropertyId, componentNamePropertyId, resourceStatePropertyId); @@ -308,12 +325,21 @@ public class StackDefinedPropertyProvider implements PropertyProvider { try { Class clz = Class.forName(definition.getType()); + + // use a Factory for the REST provider + if (clz.equals(RestMetricsPropertyProvider.class)) { + return metricPropertyProviderFactory.createRESTMetricsPropertyProvider( + definition.getProperties(), componentMetrics, streamProvider, metricsHostProvider, + clusterNamePropertyId, hostNamePropertyId, componentNamePropertyId, statePropertyId, + componentName); + } + try { /* * Warning: this branch is already used, that's why please adjust * all implementations when modifying constructor interface */ - Constructor ct = clz.getConstructor(Injector.class, Map.class, + Constructor ct = clz.getConstructor(Map.class, Map.class, StreamProvider.class, MetricHostProvider.class, String.class, String.class, String.class, String.class, String.class); Object o = ct.newInstance( http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java index 1ccc5df..e1a5e22 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java @@ -18,6 +18,19 @@ package org.apache.ambari.server.controller.jmx; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.annotation.Nullable; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.controller.internal.PropertyInfo; import org.apache.ambari.server.controller.metrics.MetricHostProvider; @@ -27,26 +40,43 @@ import org.apache.ambari.server.controller.spi.Request; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.SystemException; import org.apache.ambari.server.controller.utilities.StreamProvider; -import org.codehaus.jackson.map.DeserializationConfig; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectReader; -import org.codehaus.jackson.type.TypeReference; +import org.apache.ambari.server.state.services.MetricsRetrievalService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStream; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; /** - * Property provider implementation for JMX sources. + * The {@link JMXPropertyProvider} is used to retrieve JMX metrics from a given + * {@link Request}. This class will delegate responsibility for actually + * retrieving JMX data from a remote URL to the {@link MetricsRetrievalService}. + * It will also leverage the {@link MetricsRetrievalService} to provide cached + * {@link JMXMetricHolder} instances for given URLs. + *

+ * This is because the REST API workflow will attempt to read data from this + * provider during the context of a live Jetty thread. As a result, any attempt + * to read remote resources will cause a delay in returning a response code. On + * small clusters this mormally isn't a problem. However, as the cluster + * increases in size, the thread pool would not be able to keep pace and would + * eventually cause REST API request threads to wait while remote JMX data is + * retrieved. + *

+ * In general, this type of federated data collection is a poor design. Even + * with a large enough threadpool there are simple use cases where the model + * breaks down: + *

    + *
  • Concurrent users logged in, each creating their own requests and + * exhausting the threadpool + *
  • Misbehaving JMX endpoints which don't respond in a timely manner + *
+ *

+ * For these reasons the {@link JMXPropertyProvider} will use a completely + * asynchronous model through the {@link MetricsRetrievalService}. It should be + * noted that this provider is still an instance of a + * {@link ThreadPoolEnabledPropertyProvider} due to the nature of how the cached + * {@link JMXMetricHolder} instances need to be looped over an parsed. */ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { @@ -54,9 +84,6 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { private static final String PORT_KEY = "tag.port"; private static final String DOT_REPLACEMENT_CHAR = "#"; - private final static ObjectReader jmxObjectReader; - private final static ObjectReader stormObjectReader; - private static final Map DEFAULT_JMX_PORTS = new HashMap(); static { @@ -69,16 +96,6 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { DEFAULT_JMX_PORTS.put("NODEMANAGER", "8042"); DEFAULT_JMX_PORTS.put("JOURNALNODE", "8480"); DEFAULT_JMX_PORTS.put("STORM_REST_API", "8745"); - - ObjectMapper jmxObjectMapper = new ObjectMapper(); - jmxObjectMapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false); - jmxObjectReader = jmxObjectMapper.reader(JMXMetricHolder.class); - - TypeReference> typeRef - = new TypeReference< - HashMap - >() {}; - stormObjectReader = jmxObjectMapper.reader(typeRef); } protected final static Logger LOG = @@ -101,6 +118,13 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { private final Map clusterComponentPortsMap; + /** + * Used to submit asynchronous requests for remote metrics as well as querying + * cached metrics. + */ + @Inject + private MetricsRetrievalService metricsRetrievalService; + // ----- Constructors ------------------------------------------------------ /** @@ -115,14 +139,16 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { * @param componentNamePropertyId the component name property id * @param statePropertyId the state property id */ - public JMXPropertyProvider(Map> componentMetrics, - StreamProvider streamProvider, - JMXHostProvider jmxHostProvider, - MetricHostProvider metricHostProvider, - String clusterNamePropertyId, - String hostNamePropertyId, - String componentNamePropertyId, - String statePropertyId) { + @AssistedInject + JMXPropertyProvider( + @Assisted("componentMetrics") Map> componentMetrics, + @Assisted("streamProvider") StreamProvider streamProvider, + @Assisted("jmxHostProvider") JMXHostProvider jmxHostProvider, + @Assisted("metricHostProvider") MetricHostProvider metricHostProvider, + @Assisted("clusterNamePropertyId") String clusterNamePropertyId, + @Assisted("hostNamePropertyId") @Nullable String hostNamePropertyId, + @Assisted("componentNamePropertyId") String componentNamePropertyId, + @Assisted("statePropertyId") @Nullable String statePropertyId) { super(componentMetrics, hostNamePropertyId, metricHostProvider, clusterNamePropertyId); @@ -132,7 +158,7 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { this.hostNamePropertyId = hostNamePropertyId; this.componentNamePropertyId = componentNamePropertyId; this.statePropertyId = statePropertyId; - this.clusterComponentPortsMap = new HashMap<>(); + clusterComponentPortsMap = new HashMap<>(); } // ----- helper methods ---------------------------------------------------- @@ -174,6 +200,7 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { unsupportedIds.add(id); } } + ids.removeAll(unsupportedIds); if (ids.isEmpty()) { @@ -205,53 +232,50 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { return resource; } - InputStream in = null; - String spec = null; - try { + for (String hostName : hostNames) { try { - for (String hostName : hostNames) { - try { - String port = getPort(clusterName, componentName, hostName, httpsEnabled); - if (port == null) { - LOG.warn("Unable to get JMX metrics. No port value for " + componentName); - return resource; - } - spec = getSpec(protocol, hostName, port, "/jmx"); - if (LOG.isDebugEnabled()) { - LOG.debug("Spec: " + spec); - } - in = streamProvider.readFrom(spec); - // if the ticket becomes invalid (timeout) then bail out - if (!ticket.isValid()) { - return resource; - } + String port = getPort(clusterName, componentName, hostName, httpsEnabled); + if (port == null) { + LOG.warn("Unable to get JMX metrics. No port value for " + componentName); + return resource; + } - getHadoopMetricValue(in, ids, resource, request, ticket); + // build the URL + String jmxUrl = getSpec(protocol, hostName, port, "/jmx"); - } catch (IOException e) { - AmbariException detailedException = new AmbariException( - String.format("Unable to get JMX metrics from the host %s for the component %s. Spec: %s", hostName, componentName, spec), e); - logException(detailedException); - } + // always submit a request to cache the latest data + metricsRetrievalService.submitJMXRequest(streamProvider, jmxUrl); + + // check to see if there is a cached value and use it if there is + JMXMetricHolder jmxMetricHolder = metricsRetrievalService.getCachedJMXMetric(jmxUrl); + if (null == jmxMetricHolder) { + return resource; } - } finally { - if (in != null) { - in.close(); + + // if the ticket becomes invalid (timeout) then bail out + if (!ticket.isValid()) { + return resource; } + + getHadoopMetricValue(jmxMetricHolder, ids, resource, request, ticket); + + } catch (IOException e) { + AmbariException detailedException = new AmbariException(String.format( + "Unable to get JMX metrics from the host %s for the component %s. Spec: %s", hostName, + componentName, spec), e); + logException(detailedException); } - } catch (IOException e) { - logException(e); } + return resource; } /** * Hadoop-specific metrics fetching */ - private void getHadoopMetricValue(InputStream in, Set ids, + private void getHadoopMetricValue(JMXMetricHolder metricHolder, Set ids, Resource resource, Request request, Ticket ticket) throws IOException { - JMXMetricHolder metricHolder = jmxObjectReader.readValue(in); Map> categories = new HashMap>(); String componentName = (String) resource.getPropertyValue(componentNamePropertyId); @@ -377,7 +401,7 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { private String getJMXProtocol(String clusterName, String componentName) { return jmxHostProvider.getJMXProtocol(clusterName, componentName); } - + private Set getHosts(Resource resource, String clusterName, String componentName) { return hostNamePropertyId == null ? jmxHostProvider.getHostNames(clusterName, componentName) : http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricPropertyProviderFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricPropertyProviderFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricPropertyProviderFactory.java new file mode 100644 index 0000000..0882ca2 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricPropertyProviderFactory.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.controller.metrics; + +import java.util.Map; + +import javax.annotation.Nullable; + +import org.apache.ambari.server.controller.internal.PropertyInfo; +import org.apache.ambari.server.controller.jmx.JMXHostProvider; +import org.apache.ambari.server.controller.jmx.JMXPropertyProvider; +import org.apache.ambari.server.controller.spi.PropertyProvider; +import org.apache.ambari.server.controller.utilities.StreamProvider; + +import com.google.inject.assistedinject.Assisted; + +/** + * The {@link MetricPropertyProviderFactory} is used to provide injected + * instances of {@link PropertyProvider}s which retrieve metrics. + */ +public interface MetricPropertyProviderFactory { + + /** + * Gets a Guice-inject instance of a {@link JMXPropertyProvider}. + * + * @param componentMetrics + * the map of supported metrics + * @param streamProvider + * the stream provider + * @param jmxHostProvider + * the JMX host mapping + * @param metricHostProvider + * the host mapping + * @param clusterNamePropertyId + * the cluster name property id + * @param hostNamePropertyId + * the host name property id + * @param componentNamePropertyId + * the component name property id + * @param statePropertyId + * the state property id + * @return the instantiated and injected {@link JMXPropertyProvider}. + */ + JMXPropertyProvider createJMXPropertyProvider( + @Assisted("componentMetrics") Map> componentMetrics, + @Assisted("streamProvider") StreamProvider streamProvider, + @Assisted("jmxHostProvider") JMXHostProvider jmxHostProvider, + @Assisted("metricHostProvider") MetricHostProvider metricHostProvider, + @Assisted("clusterNamePropertyId") String clusterNamePropertyId, + @Assisted("hostNamePropertyId") @Nullable String hostNamePropertyId, + @Assisted("componentNamePropertyId") String componentNamePropertyId, + @Assisted("statePropertyId") @Nullable String statePropertyId); + + /** + * /** Create a REST property provider. + * + * @param metricsProperties + * the map of per-component metrics properties + * @param componentMetrics + * the map of supported metrics for component + * @param streamProvider + * the stream provider + * @param metricHostProvider + * metricsHostProvider instance + * @param clusterNamePropertyId + * the cluster name property id + * @param hostNamePropertyId + * the host name property id, or {@code null} if none. + * @param componentNamePropertyId + * the component name property id + * @param statePropertyId + * the state property id or {@code null} if none. + * @param componentName + * the name of the component which the metric is for, or {@code null} + * if none. + * @return the instantiated and injected {@link RestMetricsPropertyProvider}. + */ + RestMetricsPropertyProvider createRESTMetricsPropertyProvider( + @Assisted("metricsProperties") Map metricsProperties, + @Assisted("componentMetrics") Map> componentMetrics, + @Assisted("streamProvider") StreamProvider streamProvider, + @Assisted("metricHostProvider") MetricHostProvider metricHostProvider, + @Assisted("clusterNamePropertyId") String clusterNamePropertyId, + @Assisted("hostNamePropertyId") @Nullable String hostNamePropertyId, + @Assisted("componentNamePropertyId") String componentNamePropertyId, + @Assisted("statePropertyId") @Nullable String statePropertyId, + @Assisted("componentName") @Nullable String componentName); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java index 6f2a134..cbe827a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java @@ -19,12 +19,15 @@ package org.apache.ambari.server.controller.metrics; -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.reflect.TypeToken; -import com.google.gson.stream.JsonReader; -import com.google.inject.Inject; -import com.google.inject.Injector; +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.controller.internal.PropertyInfo; @@ -36,48 +39,63 @@ import org.apache.ambari.server.controller.spi.SystemException; import org.apache.ambari.server.controller.utilities.StreamProvider; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.services.MetricsRetrievalService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.lang.reflect.Type; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Hashtable; -import java.util.Map; -import java.util.Set; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.reflect.TypeToken; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; /** - * WARNING: Class should be thread-safe! + * Resolves metrics like api/cluster/summary/nimbus.uptime For every metric, + * finds a relevant JSON value and returns it as a resource property. + *

+ * This class will delegate responsibility for actually retrieving JSON data + * from a remote URL to the {@link MetricsRetrievalService}. It will also + * leverage the {@link MetricsRetrievalService} to provide cached {@link Map} + * instances for given URLs. *

- * Resolves metrics like api/cluster/summary/nimbus.uptime - * For every metric, finds a relevant JSON value and returns is as - * a resource property. + * This is because the REST API workflow will attempt to read data from this + * provider during the context of a live Jetty thread. As a result, any attempt + * to read remote resources will cause a delay in returning a response code. On + * small clusters this mormally isn't a problem. However, as the cluster + * increases in size, the thread pool would not be able to keep pace and would + * eventually cause REST API request threads to wait while remote JSON data is + * retrieved. */ public class RestMetricsPropertyProvider extends ThreadPoolEnabledPropertyProvider { protected final static Logger LOG = LoggerFactory.getLogger(RestMetricsPropertyProvider.class); - private static Map instances = - new Hashtable(); - @Inject private AmbariManagementController amc; @Inject private Clusters clusters; + /** + * Used to parse the REST JSON metrics. + */ + @Inject + private Gson gson; + + /** + * Used to submit asynchronous requests for remote metrics as well as querying + * cached metrics. + */ + @Inject + private MetricsRetrievalService metricsRetrievalService; + private final Map metricsProperties; private final StreamProvider streamProvider; private final String clusterNamePropertyId; private final String componentNamePropertyId; private final String statePropertyId; - private MetricHostProvider metricHostProvider; private final String componentName; private static final String DEFAULT_PORT_PROPERTY = "default_port"; @@ -116,26 +134,23 @@ public class RestMetricsPropertyProvider extends ThreadPoolEnabledPropertyProvid * @param componentNamePropertyId the component name property id * @param statePropertyId the state property id */ - public RestMetricsPropertyProvider( - Injector injector, - Map metricsProperties, - Map> componentMetrics, - StreamProvider streamProvider, - MetricHostProvider metricHostProvider, - String clusterNamePropertyId, - String hostNamePropertyId, - String componentNamePropertyId, - String statePropertyId, - String componentName){ - + @AssistedInject + RestMetricsPropertyProvider( + @Assisted("metricsProperties") Map metricsProperties, + @Assisted("componentMetrics") Map> componentMetrics, + @Assisted("streamProvider") StreamProvider streamProvider, + @Assisted("metricHostProvider") MetricHostProvider metricHostProvider, + @Assisted("clusterNamePropertyId") String clusterNamePropertyId, + @Assisted("hostNamePropertyId") @Nullable String hostNamePropertyId, + @Assisted("componentNamePropertyId") String componentNamePropertyId, + @Assisted("statePropertyId") @Nullable String statePropertyId, + @Assisted("componentName") @Nullable String componentName) { super(componentMetrics, hostNamePropertyId, metricHostProvider, clusterNamePropertyId); this.metricsProperties = metricsProperties; this.streamProvider = streamProvider; this.clusterNamePropertyId = clusterNamePropertyId; this.componentNamePropertyId = componentNamePropertyId; this.statePropertyId = statePropertyId; - this.metricHostProvider = metricHostProvider; - injector.injectMembers(this); this.componentName = componentName; } @@ -223,24 +238,28 @@ public class RestMetricsPropertyProvider extends ThreadPoolEnabledPropertyProvid HashMap> urls = extractPropertyURLs(resultIds, propertyInfos); for (String url : urls.keySet()) { - String spec = null; + String spec = getSpec(protocol, hostname, port, url); + + // always submit a request to cache the latest data + metricsRetrievalService.submitRESTRequest(streamProvider, spec); + + // check to see if there is a cached value and use it if there is + Map jsonMap = metricsRetrievalService.getCachedRESTMetric(spec); + if (null == jsonMap) { + return resource; + } + + if (!ticket.isValid()) { + return resource; + } + try { - spec = getSpec(protocol, hostname, port, url); - InputStream in = streamProvider.readFrom(spec); - if (!ticket.isValid()) { - if (in != null) { - in.close(); - } - return resource; - } - try { - extractValuesFromJSON(in, urls.get(url), resource, propertyInfos); - } finally { - in.close(); - } - } catch (IOException e) { - AmbariException detailedException = new AmbariException( - String.format("Unable to get REST metrics from the host %s for the component %s. Spec: %s", hostname, resourceComponentName, spec), e); + extractValuesFromJSON(jsonMap, urls.get(url), resource, propertyInfos); + } catch (AmbariException ambariException) { + AmbariException detailedException = new AmbariException(String.format( + "Unable to get REST metrics from the for %s at $s", resourceComponentName, spec), + ambariException); + logException(detailedException); } } @@ -418,25 +437,17 @@ public class RestMetricsPropertyProvider extends ThreadPoolEnabledPropertyProvid /** - * Extracts requested properties from a given JSON input stream into - * resource. + * Extracts requested properties from a parsed {@link Map} of {@link String}. * - * @param jsonStream input stream that contains JSON - * @param requestedPropertyIds a set of property IDs - * that should be fetched for this URL - * @param resource all extracted values are placed into resource + * @param requestedPropertyIds + * a set of property IDs that should be fetched for this URL + * @param resource + * all extracted values are placed into resource */ - private void extractValuesFromJSON(InputStream jsonStream, - Set requestedPropertyIds, - Resource resource, - Map propertyInfos) - throws IOException { - Gson gson = new Gson(); - Type type = new TypeToken>() { - }.getType(); - JsonReader jsonReader = new JsonReader( - new BufferedReader(new InputStreamReader(jsonStream))); - Map jsonMap = gson.fromJson(jsonReader, type); + private void extractValuesFromJSON(Map jsonMap, + Set requestedPropertyIds, Resource resource, Map propertyInfos) + throws AmbariException { + Type type = new TypeToken>() {}.getType(); for (String requestedPropertyId : requestedPropertyIds) { PropertyInfo propertyInfo = propertyInfos.get(requestedPropertyId); String metricsPath = propertyInfo.getPropertyId(); @@ -450,7 +461,8 @@ public class RestMetricsPropertyProvider extends ThreadPoolEnabledPropertyProvid "Can not fetch %dth element of document path (%s) " + "from json. Wrong metrics path: %s", i, pathElement, metricsPath); - throw new IOException(message); + + throw new AmbariException(message); } Object jsonSubElement = jsonMap.get(pathElement); if (i == docPath.length - 1) { // Reached target document section http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java index 6f4a6ea..367c339 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java @@ -18,19 +18,6 @@ package org.apache.ambari.server.controller.metrics; -import com.google.common.base.Throwables; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import org.apache.ambari.server.configuration.Configuration; -import org.apache.ambari.server.controller.internal.AbstractPropertyProvider; -import org.apache.ambari.server.controller.internal.PropertyInfo; -import org.apache.ambari.server.controller.spi.Predicate; -import org.apache.ambari.server.controller.spi.Request; -import org.apache.ambari.server.controller.spi.Resource; -import org.apache.ambari.server.controller.spi.SystemException; -import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor; -import org.apache.ambari.server.controller.utilities.BufferedThreadPoolExecutorCompletionService; - import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -39,17 +26,57 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import javax.inject.Inject; +import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.controller.internal.AbstractPropertyProvider; +import org.apache.ambari.server.controller.internal.PropertyInfo; +import org.apache.ambari.server.controller.jmx.JMXPropertyProvider; +import org.apache.ambari.server.controller.spi.Predicate; +import org.apache.ambari.server.controller.spi.PropertyProvider; +import org.apache.ambari.server.controller.spi.Request; +import org.apache.ambari.server.controller.spi.Resource; +import org.apache.ambari.server.controller.spi.SystemException; +import org.apache.ambari.server.controller.utilities.BufferedThreadPoolExecutorCompletionService; +import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor; + +import com.google.common.base.Throwables; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.Inject; /** - * Unites common functionality for multithreaded metrics providers - * (JMX and REST as of now). Shares the same pool of executor threads. + * Unites common functionality for multithreaded metrics providers (JMX and REST + * as of now). Shares the same pool of executor threads across all + * implementations. + *

+ * This {@link PropertyProvider} should not be mistaken for a way to perform + * expensive operations, as it is still called as part of the incoming REST + * Jetty request. It is poor design to have UI threads from the web client + * waiting on expensive operations from a {@link PropertyProvider}, even if they + * are spread across multiple threads. + *

+ * Instead, this {@link PropertyProvider} is useful for spreading many small, + * quick operations across a threadpool. This is why the known implementations + * of this class (such as the {@link JMXPropertyProvider}) use a cache instead + * of reaching out to network endpoints on their own. + *

+ * This is also why the {@link ThreadPoolExecutor} used here has an unbounded + * worker queue and essentially a fixed core size to perform its work. When + * {@link Callable}s are rejected because of a worker queue exhaustion, they are + * never submitted for execution, yet the {@link Future} instance is still + * returned. Therefore, if the queue is ever exhausted, incoming REST API + * requests must wait the entire {@link CompletionService#poll(long, TimeUnit)} + * timeout before skipping the result and returning control. + * */ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractPropertyProvider { + protected static Configuration configuration; + /** * Host states that make available metrics collection */ @@ -59,29 +86,24 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty private final String clusterNamePropertyId; /** - * Executor service is shared between all childs of current class + * Executor service is shared between all instances. */ private static ThreadPoolExecutor EXECUTOR_SERVICE; private static int THREAD_POOL_CORE_SIZE; private static int THREAD_POOL_MAX_SIZE; + private static int THREAD_POOL_WORKER_QUEUE_SIZE; + private static long COMPLETION_SERVICE_POLL_TIMEOUT; private static final long THREAD_POOL_TIMEOUT_MILLIS = 30000L; @Inject public static void init(Configuration configuration) { THREAD_POOL_CORE_SIZE = configuration.getPropertyProvidersThreadPoolCoreSize(); THREAD_POOL_MAX_SIZE = configuration.getPropertyProvidersThreadPoolMaxSize(); + THREAD_POOL_WORKER_QUEUE_SIZE = configuration.getPropertyProvidersWorkerQueueSize(); + COMPLETION_SERVICE_POLL_TIMEOUT = configuration.getPropertyProvidersCompletionServiceTimeout(); EXECUTOR_SERVICE = initExecutorService(); } - private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 10000L; - /** - * The amount of time that this provider will wait for JMX metric values to be - * returned from the JMX sources. If no results are returned for this amount of - * time then the request to populate the resources will fail. - */ - protected long populateTimeout = DEFAULT_POPULATE_TIMEOUT_MILLIS; - public static final String TIMED_OUT_MSG = "Timed out waiting for metrics."; - private static final Cache exceptionsCache = CacheBuilder.newBuilder() .expireAfterWrite(5, TimeUnit.MINUTES) .build(); @@ -114,8 +136,15 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty THREAD_POOL_CORE_SIZE, THREAD_POOL_MAX_SIZE, THREAD_POOL_TIMEOUT_MILLIS, - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS, + THREAD_POOL_WORKER_QUEUE_SIZE); threadPoolExecutor.allowCoreThreadTimeOut(true); + + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat( + "ambari-property-provider-thread-%d").build(); + + threadPoolExecutor.setThreadFactory(threadFactory); + return threadPoolExecutor; } @@ -128,36 +157,47 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty if(!checkAuthorizationForMetrics(resources, clusterNamePropertyId)) { return resources; } + // Get a valid ticket for the request. - Ticket ticket = new Ticket(); + final Ticket ticket = new Ticket(); - CompletionService completionService = + // in most cases, the buffered completion service will not be utlized for + // its advantages since the worker queue is unbounded. However, if is + // configured with a boundary, then the buffered service ensures that no + // requests are discarded. + final CompletionService completionService = new BufferedThreadPoolExecutorCompletionService(EXECUTOR_SERVICE); // In a large cluster we could have thousands of resources to populate here. // Distribute the work across multiple threads. for (Resource resource : resources) { - completionService.submit(getPopulateResourceCallable(resource, request, predicate, ticket)); + completionService.submit( + getPopulateResourceCallable(resource, request, predicate, ticket)); } Set keepers = new HashSet(); try { - for (int i = 0; i < resources.size(); ++ i) { - Future resourceFuture = - completionService.poll(populateTimeout, TimeUnit.MILLISECONDS); + for (int i = 0; i < resources.size(); ++i) { + Future resourceFuture = completionService.poll(COMPLETION_SERVICE_POLL_TIMEOUT, + TimeUnit.MILLISECONDS); if (resourceFuture == null) { - // its been more than the populateTimeout since the last callable completed ... - // invalidate the ticket to abort the threads and don't wait any longer + // its been more than the populateTimeout since the last callable + // completed ... + // invalidate the ticket to abort the threads and don't wait any + // longer ticket.invalidate(); - LOG.error(TIMED_OUT_MSG); + LOG.error("Timed out after waiting {}ms waiting for request {}", + COMPLETION_SERVICE_POLL_TIMEOUT, request); + + // stop iterating break; - } else { - // future should already be completed... no need to wait on get - Resource resource = resourceFuture.get(); - if (resource != null) { - keepers.add(resource); - } + } + + // future should already be completed... no need to wait on get + Resource resource = resourceFuture.get(); + if (resource != null) { + keepers.add(resource); } } } catch (InterruptedException e) { @@ -165,6 +205,7 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty } catch (ExecutionException e) { rethrowSystemException(e.getCause()); } + return keepers; } @@ -181,6 +222,7 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty private Callable getPopulateResourceCallable( final Resource resource, final Request request, final Predicate predicate, final Ticket ticket) { return new Callable() { + @Override public Resource call() throws SystemException { return populateResource(resource, request, predicate, ticket); } @@ -211,8 +253,7 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty protected void setPopulateTimeout(long populateTimeout) { - this.populateTimeout = populateTimeout; - + COMPLETION_SERVICE_POLL_TIMEOUT = populateTimeout; } @@ -247,7 +288,7 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty * @return the error message that was logged */ protected static String logException(final Throwable throwable) { - final String msg = "Caught exception getting JMX metrics : " + throwable.getLocalizedMessage(); + final String msg = "Caught exception getting metrics : " + throwable.getLocalizedMessage(); // JsonParseException includes InputStream's hash code into the message. // getMessage and printStackTrace returns a different String every time. http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java index 4d6daa6..9f31fef 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java @@ -30,13 +30,18 @@ import java.util.concurrent.TimeUnit; * and uses its thread pool to execute tasks - buffering any tasks that * overflow. Such buffered tasks are later re-submitted to the executor when * finished tasks are polled or taken. - * + *

* This class overrides the {@link ThreadPoolExecutor}'s - * {@link RejectedExecutionHandler} to collect overflowing tasks. - * + * {@link RejectedExecutionHandler} to collect overflowing tasks. When + * {@link #poll(long, TimeUnit)} is invoked, this class will attempt to + * re-submit any overflow tasks before waiting the specified amount of time. + * This will prevent blocking on an empty completion queue since the parent + * {@link ExecutorCompletionService} doesn't have any idea that there are tasks + * waiting to be resubmitted. + *

* The {@link ScalingThreadPoolExecutor} can be used in conjunction with this * class to provide an efficient buffered, scaling thread pool implementation. - * + * * @param */ public class BufferedThreadPoolExecutorCompletionService extends ExecutorCompletionService { @@ -84,15 +89,50 @@ public class BufferedThreadPoolExecutorCompletionService extends ExecutorComp return poll; } + /** + * {@inheritDoc} + *

+ * The goal of this method is to prevent blocking if there are tasks waiting + * in the overflow queue. In the event that the overflow queue was populated, + * we should not blindly wait on the parent + * {@link ExecutorCompletionService#poll(long, TimeUnit)} method. Instead, we + * should ensure that we have submitted at least one of our own tasks for + * completion. + */ @Override public Future poll(long timeout, TimeUnit unit) throws InterruptedException { - Future poll = super.poll(timeout, unit); - if (!executor.isTerminating() && !overflowQueue.isEmpty() && executor.getActiveCount() < executor.getMaximumPoolSize()) { - Runnable overflow = overflowQueue.poll(); - if (overflow != null) { - executor.execute(overflow); + // first poll for anything that's already completed and do a short-circuit return + Future poll = super.poll(); + if( null != poll ) { + // there's something to return; that's great, but let's also see if we can + // submit anything in the overflow queue back to the completion service + if (!executor.isTerminating() && !overflowQueue.isEmpty() + && executor.getActiveCount() < executor.getMaximumPoolSize()) { + Runnable overflow = overflowQueue.poll(); + if (overflow != null) { + executor.execute(overflow); + } } + + // return the future + return poll; } + + // nothing completed yet, so check active thread count - if there is nothing + // working either, then that means that the parent completion service thinks + // it's done - we should submit our own tasks + if (executor.getActiveCount() == 0) { + if (!executor.isTerminating() && !overflowQueue.isEmpty()) { + Runnable overflow = overflowQueue.poll(); + if (overflow != null) { + executor.execute(overflow); + } + } + } + + // now that we've confirmed that either the parent completion service is + // still working or we submitted our own task, we can poll with a timeout + poll = super.poll(timeout, unit); return poll; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/384a4bea/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java index 7a5e479..864e3fb 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java @@ -28,21 +28,45 @@ import java.util.concurrent.TimeUnit; * possibly several pooled threads. It also scales up the number of threads in * the pool if the number of submissions exceeds the core size of the pool. The * pool can scale up to the specified maximum pool size. - * + * * If the number of submissions exceeds the sum of the core and maximum size of * the thread pool, the submissions are then handled by the provided * {@link RejectedExecutionHandler}. - * + * * If the overflowing submissions need to be handled, * {@link BufferedThreadPoolExecutorCompletionService} can be used to buffer up * overflowing submissions for later submission as threads become available. - * + * * @see BufferedThreadPoolExecutorCompletionService */ public class ScalingThreadPoolExecutor extends ThreadPoolExecutor { - public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue(corePoolSize)); + /** + * Constructor. + * + * @param corePoolSize + * the number of threads which will be considered as "core". If core + * thread timeout is not enabled, then this will be the minimum + * number of threads, always. With core timeout enabled, then this + * will be the number of threads spun up to handle incoming work + * regardless of worker queue size. + * @param maximumPoolSize + * the maximum number of threads which can be spawned when an + * {@link ExecutorService} encounters a failure inserting into the + * work queue. These threads are not spawned to handle entries into + * the work queue; they are only spawned if the queue fills and a + * {@link RejectedExecutionHandler} is invoked. + * @param keepAliveTime + * the TTL for core threads + * @param unit + * the time unit for core threads + * @param workerQueueSize + * the size of the worker queue. The threads specified by + * {@link #getMaximumPoolSize()} will only be created if the worker + * queue is exhausted. + */ + public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, int workerQueueSize) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue(workerQueueSize)); } - }