ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jonathanhur...@apache.org
Subject [2/2] ambari git commit: AMBARI-16913 - Web Client Requests Handled By Jetty Should Not Be Blocked By JMX Property Providers (jonathanhurley)
Date Sat, 28 May 2016 18:48:54 GMT
AMBARI-16913 - Web Client Requests Handled By Jetty Should Not Be Blocked By JMX Property Providers (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/eec799d1
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/eec799d1
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/eec799d1

Branch: refs/heads/trunk
Commit: eec799d16af778a816ffdf3a52bfee0a319c9103
Parents: 2fcc947
Author: Jonathan Hurley <jhurley@hortonworks.com>
Authored: Fri May 27 13:26:30 2016 -0400
Committer: Jonathan Hurley <jhurley@hortonworks.com>
Committed: Sat May 28 11:16:20 2016 -0400

----------------------------------------------------------------------
 .../org/apache/ambari/server/AmbariService.java |   5 +-
 .../server/configuration/Configuration.java     | 154 +++++-
 .../controller/AmbariManagementController.java  |   9 +
 .../AmbariManagementControllerImpl.java         |   9 +
 .../ambari/server/controller/AmbariServer.java  |  16 +-
 .../server/controller/ControllerModule.java     |  13 +-
 .../internal/AbstractProviderModule.java        |  23 +-
 .../internal/StackDefinedPropertyProvider.java  |  60 ++-
 .../controller/jmx/JMXPropertyProvider.java     | 166 +++---
 .../metrics/MetricPropertyProviderFactory.java  | 103 ++++
 .../metrics/RestMetricsPropertyProvider.java    | 160 +++---
 .../ThreadPoolEnabledPropertyProvider.java      | 131 +++--
 ...eredThreadPoolExecutorCompletionService.java |  58 +-
 .../utilities/ScalingThreadPoolExecutor.java    |  36 +-
 .../state/services/MetricsRetrievalService.java | 539 +++++++++++++++++++
 .../server/configuration/ConfigurationTest.java |  72 +++
 .../StackDefinedPropertyProviderTest.java       |  48 +-
 .../metrics/JMXPropertyProviderTest.java        |  64 ++-
 .../RestMetricsPropertyProviderTest.java        |  40 +-
 ...ThreadPoolExecutorCompletionServiceTest.java |  15 +-
 .../utils/SynchronousThreadPoolExecutor.java    |  85 +++
 21 files changed, 1493 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/AmbariService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/AmbariService.java b/ambari-server/src/main/java/org/apache/ambari/server/AmbariService.java
index 186e272..86ad1cf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/AmbariService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/AmbariService.java
@@ -22,15 +22,14 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.common.util.concurrent.Service;
 import com.google.common.util.concurrent.ServiceManager;
 import com.google.inject.ScopeAnnotation;
 import com.google.inject.Singleton;
 
 /**
  * The {@link AmbariService} annotation is used to register a class that
- * implements Guava's {@link AbstractScheduledService} with the
- * {@link ServiceManager}.
+ * implements Guava's {@link Service} with the {@link ServiceManager}.
  * <p/>
  * Classes with this annotation are bound as singletons and automatically
  * injected with their members. There is not need to use {@link Singleton} or

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 0c2fbba..d104cb6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -34,6 +34,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.ambari.annotations.Experimental;
 import org.apache.ambari.annotations.ExperimentalFeature;
@@ -48,6 +50,7 @@ import org.apache.ambari.server.security.authorization.LdapServerProperties;
 import org.apache.ambari.server.security.authorization.jwt.JwtAuthenticationProperties;
 import org.apache.ambari.server.security.encryption.CertificateUtils;
 import org.apache.ambari.server.security.encryption.CredentialProvider;
+import org.apache.ambari.server.state.services.MetricsRetrievalService;
 import org.apache.ambari.server.state.stack.OsFamily;
 import org.apache.ambari.server.utils.AmbariPath;
 import org.apache.ambari.server.utils.Parallel;
@@ -566,10 +569,20 @@ public class Configuration {
   private static final int VIEW_REQUEST_THREADPOOL_TIMEOUT_DEFAULT = 2000;
 
 
+  /**
+   * Threadpool sizing based on the number of available processors multiplied by 2.
+   */
+  public static final int PROCESSOR_BASED_THREADPOOL_CORE_SIZE_DEFAULT = 2 * Runtime.getRuntime().availableProcessors();
+
+  /**
+   * Threadpool sizing based on the number of available processors multiplied by 4.
+   */
+  public static final int PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT = 4 * Runtime.getRuntime().availableProcessors();
+
   public static final String PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_KEY = "server.property-provider.threadpool.size.max";
-  public static final int PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_DEFAULT = 4 * Runtime.getRuntime().availableProcessors();
   public static final String PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_KEY = "server.property-provider.threadpool.size.core";
-  public static final int PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_DEFAULT = 2 * Runtime.getRuntime().availableProcessors();
+  public static final String PROPERTY_PROVIDER_THREADPOOL_WORKER_QUEUE_SIZE = "server.property-provider.threadpool.worker.size";
+  public static final String PROPERTY_PROVIDER_THREADPOOL_COMPLETION_TIMEOUT = "server.property-provider.threadpool.completion.timeout";
 
   private static final String SERVER_HTTP_SESSION_INACTIVE_TIMEOUT = "server.http.session.inactive_timeout";
 
@@ -720,6 +733,39 @@ public class Configuration {
 
   public static final String ALERTS_SNMP_DISPATCH_UDP_PORT = "alerts.snmp.dispatcher.udp.port";
 
+  /**
+   * The amount of time that the {@link MetricsRetrievalService} will cache
+   * retrieved metric data.
+   */
+  public static final String METRIC_RETRIEVAL_SERVICE_CACHE_TIMEOUT = "metrics.retrieval-service.cache.timeout";
+
+  /**
+   * The priorty of the {@link Thread}s used by the
+   * {@link MetricsRetrievalService}. This is a value in between
+   * {@link Thread#MIN_PRIORITY} and {@link Thread#MAX_PRIORITY}.
+   */
+  public static final String METRIC_RETRIEVAL_SERVICE_THREAD_PRIORITY = "server.metrics.retrieval-service.thread.priority";
+
+  /**
+   * The maximum size of the threadpool for the {@link MetricsRetrievalService}.
+   * This value is only applicable if the
+   * {@link #METRIC_RETRIEVAL_SERVICE_THREADPOOL_WORKER_QUEUE_SIZE} is small
+   * enough to trigger the {@link ThreadPoolExecutor} to create new threads.
+   */
+  public static final String METRIC_RETRIEVAL_SERVICE_THREADPOOL_MAX_SIZE = "server.metrics.retrieval-service.threadpool.size.max";
+
+  /**
+   * The core size of the threadpool for the {@link MetricsRetrievalService}.
+   */
+  public static final String METRIC_RETRIEVAL_SERVICE_THREADPOOL_CORE_SIZE = "server.metrics.retrieval-service.threadpool.size.core";
+
+  /**
+   * The size of the worker queue for the {@link MetricsRetrievalService}. The
+   * larger this queue is, the less likely it will be to create more threads
+   * beyond the core size.
+   */
+  public static final String METRIC_RETRIEVAL_SERVICE_THREADPOOL_WORKER_QUEUE_SIZE = "server.metrics.retrieval-service.threadpool.worker.size";
+
   private static final Logger LOG = LoggerFactory.getLogger(
     Configuration.class);
 
@@ -2465,7 +2511,7 @@ public class Configuration {
    */
   public int getPropertyProvidersThreadPoolCoreSize() {
     return Integer.parseInt(properties.getProperty(PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_KEY,
-      String.valueOf(PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_DEFAULT)));
+        String.valueOf(PROCESSOR_BASED_THREADPOOL_CORE_SIZE_DEFAULT)));
   }
 
   /**
@@ -2475,7 +2521,31 @@ public class Configuration {
    */
   public int getPropertyProvidersThreadPoolMaxSize() {
     return Integer.parseInt(properties.getProperty(PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_KEY,
-      String.valueOf(PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_DEFAULT)));
+        String.valueOf(PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT)));
+  }
+
+  /**
+   * Get property-providers' worker queue size. This will return
+   * {@link Integer#MAX_VALUE} if not specified which will allow an unbounded
+   * queue and essentially a fixed core threadpool size.
+   *
+   * @return the property-providers' worker queue size.
+   */
+  public int getPropertyProvidersWorkerQueueSize() {
+    return Integer.parseInt(properties.getProperty(PROPERTY_PROVIDER_THREADPOOL_WORKER_QUEUE_SIZE,
+        String.valueOf(Integer.MAX_VALUE)));
+  }
+
+  /**
+   * Get property-providers' timeout value in milliseconds for waiting on the
+   * completion of submitted {@link Callable}s. This will return {@value 5000}
+   * if not specified.
+   *
+   * @return the property-providers' completion srevice timeout, in millis.
+   */
+  public long getPropertyProvidersCompletionServiceTimeout() {
+    return Long.parseLong(properties.getProperty(PROPERTY_PROVIDER_THREADPOOL_COMPLETION_TIMEOUT,
+        String.valueOf(5000)));
   }
 
   /**
@@ -3037,4 +3107,80 @@ public class Configuration {
   public boolean isLdapAlternateUserSearchEnabled() {
     return Boolean.parseBoolean(properties.getProperty(LDAP_ALT_USER_SEARCH_ENABLED_KEY, LDAP_ALT_USER_SEARCH_ENABLED_DEFAULT));
   }
+
+  /**
+   * Gets the number of minutes that data cached by the
+   * {@link MetricsRetrievalService} is kept. The longer this value is, the
+   * older the data will be when a user first logs in. After that first login,
+   * data will be updated by the {@link MetricsRetrievalService} as long as
+   * incoming REST requests are made.
+   * <p/>
+   * It is recommended that this value be longer rather than shorter since the
+   * performance benefit of the cache greatly outweighs the data loaded after
+   * first login.
+   *
+   * @return the number of minutes, defaulting to 30 if not specified.
+   */
+  public int getMetricsServiceCacheTimeout() {
+    return Integer.parseInt(properties.getProperty(METRIC_RETRIEVAL_SERVICE_CACHE_TIMEOUT, "30"));
+  }
+
+  /**
+   * Gets the priority of the {@link Thread}s used by the
+   * {@link MetricsRetrievalService}. This will be a value within the range of
+   * {@link Thread#MIN_PRIORITY} and {@link Thread#MAX_PRIORITY}.
+   *
+   * @return the thread proprity.
+   */
+  public int getMetricsServiceThreadPriority() {
+    int priority = Integer.parseInt(properties.getProperty(METRIC_RETRIEVAL_SERVICE_THREAD_PRIORITY,
+        String.valueOf(Thread.NORM_PRIORITY)));
+
+    if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
+      priority = Thread.NORM_PRIORITY;
+    }
+
+    return priority;
+  }
+
+  /**
+   * Gets the core pool size used for the {@link MetricsRetrievalService}.
+   *
+   * @return the core pool size or
+   *         {@value #PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT} if not
+   *         specified.
+   */
+  public int getMetricsServiceThreadPoolCoreSize() {
+    return Integer.parseInt(properties.getProperty(METRIC_RETRIEVAL_SERVICE_THREADPOOL_CORE_SIZE,
+        String.valueOf(PROCESSOR_BASED_THREADPOOL_CORE_SIZE_DEFAULT)));
+  }
+
+  /**
+   * Gets the max pool size used for the {@link MetricsRetrievalService}.
+   * Threads will only be increased up to this value of the worker queue is
+   * exhauseted and rejects the new task.
+   *
+   * @return the max pool size, or
+   *         {@value PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT} if not
+   *         specified.
+   * @see #getMetricsServiceWorkerQueueSize()
+   */
+  public int getMetricsServiceThreadPoolMaxSize() {
+    return Integer.parseInt(properties.getProperty(METRIC_RETRIEVAL_SERVICE_THREADPOOL_MAX_SIZE,
+        String.valueOf(PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT)));
+  }
+
+  /**
+   * Gets the queue size of the worker queue for the
+   * {@link MetricsRetrievalService}.
+   *
+   * @return the worker queue size, or {@code 10 *}
+   *         {@link #getMetricsServiceThreadPoolMaxSize()} if not specified.
+   */
+  public int getMetricsServiceWorkerQueueSize() {
+    return Integer.parseInt(
+        properties.getProperty(METRIC_RETRIEVAL_SERVICE_THREADPOOL_WORKER_QUEUE_SIZE,
+            String.valueOf(10 * getMetricsServiceThreadPoolMaxSize())));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
index d6b9d0e..10a1f8c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
@@ -29,6 +29,7 @@ import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.internal.RequestStageContainer;
+import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory;
 import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
 import org.apache.ambari.server.metadata.RoleCommandOrder;
 import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
@@ -797,6 +798,14 @@ public interface AmbariManagementController {
   TimelineMetricCacheProvider getTimelineMetricCacheProvider();
 
   /**
+   * Gets the {@link MetricPropertyProviderFactory} that was injected into this
+   * class. This is a terrible pattern.
+   *
+   * @return the injected {@link MetricPropertyProviderFactory}
+   */
+  MetricPropertyProviderFactory getMetricPropertyProviderFactory();
+
+  /**
    * Returns KerberosHelper instance
    * @return
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index f4a615c..79a4e90 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -93,6 +93,7 @@ import org.apache.ambari.server.controller.internal.RequestStageContainer;
 import org.apache.ambari.server.controller.internal.URLStreamProvider;
 import org.apache.ambari.server.controller.internal.WidgetLayoutResourceProvider;
 import org.apache.ambari.server.controller.internal.WidgetResourceProvider;
+import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory;
 import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.customactions.ActionDefinition;
@@ -4715,6 +4716,14 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
     return injector.getInstance(TimelineMetricCacheProvider.class);
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MetricPropertyProviderFactory getMetricPropertyProviderFactory() {
+    return injector.getInstance(MetricPropertyProviderFactory.class);
+  }
+
   @Override
   public KerberosHelper getKerberosHelper() {
     return kerberosHelper;

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index 99a6cab..d17a451 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -166,6 +166,16 @@ import com.sun.jersey.spi.container.servlet.ServletContainer;
 public class AmbariServer {
   private static Logger LOG = LoggerFactory.getLogger(AmbariServer.class);
 
+  /**
+   * The thread name prefix for threads handling agent requests.
+   */
+  private static final String AGENT_THREAD_POOL_NAME = "ambari-agent-thread";
+
+  /**
+   * The thread name prefix for threads handling REST API requests.
+   */
+  private static final String CLIENT_THREAD_POOL_NAME = "ambari-client-thread";
+
   // Set velocity logger
   protected static final String VELOCITY_LOG_CATEGORY = "VelocityLogger";
 
@@ -458,7 +468,7 @@ public class AmbariServer {
 
         // Agent Jetty thread pool
         configureJettyThreadPool(serverForAgent, sslConnectorOneWay.getAcceptors(),
-            "qtp-ambari-agent", configs.getAgentThreadPoolSize());
+            AGENT_THREAD_POOL_NAME, configs.getAgentThreadPoolSize());
 
         serverForAgent.addConnector(sslConnectorOneWay);
         serverForAgent.addConnector(sslConnectorTwoWay);
@@ -468,7 +478,7 @@ public class AmbariServer {
         agentConnector.setIdleTimeout(configs.getConnectionMaxIdleTime());
 
         // Agent Jetty thread pool
-        configureJettyThreadPool(serverForAgent, agentConnector.getAcceptors(), "qtp-ambari-agent",
+        configureJettyThreadPool(serverForAgent, agentConnector.getAcceptors(), AGENT_THREAD_POOL_NAME,
             configs.getAgentThreadPoolSize());
 
         serverForAgent.addConnector(agentConnector);
@@ -575,7 +585,7 @@ public class AmbariServer {
       }
 
       // Client Jetty thread pool
-      configureJettyThreadPool(server, apiConnector.getAcceptors(), "qtp-ambari-client", configs.getClientThreadPoolSize());
+      configureJettyThreadPool(server, apiConnector.getAcceptors(), CLIENT_THREAD_POOL_NAME, configs.getClientThreadPoolSize());
       server.addConnector(apiConnector);
 
       server.setStopAtShutdown(true);

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
index 617553b..e0bda13 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
@@ -70,6 +70,7 @@ import org.apache.ambari.server.controller.internal.MemberResourceProvider;
 import org.apache.ambari.server.controller.internal.RepositoryVersionResourceProvider;
 import org.apache.ambari.server.controller.internal.ServiceResourceProvider;
 import org.apache.ambari.server.controller.internal.UpgradeResourceProvider;
+import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory;
 import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory;
 import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
@@ -143,7 +144,6 @@ import org.springframework.security.web.AuthenticationEntryPoint;
 import org.springframework.util.ClassUtils;
 import org.springframework.web.filter.DelegatingFilterProxy;
 
-import com.google.common.util.concurrent.AbstractScheduledService;
 import com.google.common.util.concurrent.ServiceManager;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -456,6 +456,7 @@ public class ControllerModule extends AbstractModule {
     install(new FactoryModuleBuilder().build(RequestFactory.class));
     install(new FactoryModuleBuilder().build(StackManagerFactory.class));
     install(new FactoryModuleBuilder().build(ExecutionCommandWrapperFactory.class));
+    install(new FactoryModuleBuilder().build(MetricPropertyProviderFactory.class));
 
     bind(HostRoleCommandFactory.class).to(HostRoleCommandFactoryImpl.class);
     bind(SecurityHelper.class).toInstance(SecurityHelperImpl.getInstance());
@@ -528,9 +529,9 @@ public class ControllerModule extends AbstractModule {
       // Ambari services are registered with Guava
       if (null != clazz.getAnnotation(AmbariService.class)) {
         // safety check to ensure it's actually a Guava service
-        if (!AbstractScheduledService.class.isAssignableFrom(clazz)) {
+        if (!com.google.common.util.concurrent.Service.class.isAssignableFrom(clazz)) {
           String message = MessageFormat.format(
-              "Unable to register service {0} because it is not an AbstractScheduledService",
+              "Unable to register service {0} because it is not a Service which can be scheduled",
               clazz);
 
           LOG.warn(message);
@@ -538,10 +539,10 @@ public class ControllerModule extends AbstractModule {
         }
 
         // instantiate the service, register as singleton via toInstance()
-        AbstractScheduledService service = null;
+        com.google.common.util.concurrent.Service service = null;
         try {
-          service = (AbstractScheduledService) clazz.newInstance();
-          bind((Class<AbstractScheduledService>) clazz).toInstance(service);
+          service = (com.google.common.util.concurrent.Service) clazz.newInstance();
+          bind((Class<com.google.common.util.concurrent.Service>) clazz).toInstance(service);
           services.add(service);
           LOG.debug("Registering service {} ", clazz);
         } catch (Exception exception) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
index 04a8f0a..d747961 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
@@ -43,9 +43,9 @@ import org.apache.ambari.server.controller.HostResponse;
 import org.apache.ambari.server.controller.ServiceComponentHostRequest;
 import org.apache.ambari.server.controller.ServiceComponentHostResponse;
 import org.apache.ambari.server.controller.jmx.JMXHostProvider;
-import org.apache.ambari.server.controller.jmx.JMXPropertyProvider;
 import org.apache.ambari.server.controller.logging.LoggingSearchPropertyProvider;
 import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory;
 import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider;
 import org.apache.ambari.server.controller.metrics.MetricsReportPropertyProvider;
 import org.apache.ambari.server.controller.metrics.MetricsServiceProvider;
@@ -172,11 +172,11 @@ public abstract class AbstractProviderModule implements ProviderModule,
     initPropMap = new HashMap<String, String[]>();
     initPropMap.put("NODEMANAGER", new String[]{"yarn.http.policy"});
     jmxDesiredProperties.put("NODEMANAGER", initPropMap);
-    
+
     initPropMap = new HashMap<String, String[]>();
     initPropMap.put("HISTORYSERVER", new String[]{"mapreduce.jobhistory.http.policy"});
     jmxDesiredProperties.put("HISTORYSERVER", initPropMap);
-        
+
     initPropMap = new HashMap<String, String[]>();
     initPropMap.put("client", new String[]{"dfs.namenode.rpc-address"});
     initPropMap.put("datanode", new String[]{"dfs.namenode.servicerpc-address"});
@@ -213,6 +213,13 @@ public abstract class AbstractProviderModule implements ProviderModule,
   TimelineMetricCacheProvider metricCacheProvider;
 
   /**
+   * A factory used to retrieve Guice-injected instances of a metric
+   * {@link PropertyProvider}.
+   */
+  @Inject
+  private MetricPropertyProviderFactory metricPropertyProviderFactory;
+
+  /**
    * The map of host components.
    */
   private Map<String, Map<String, String>> clusterHostComponentMap;
@@ -248,9 +255,14 @@ public abstract class AbstractProviderModule implements ProviderModule,
     if (managementController == null) {
       managementController = AmbariServer.getController();
     }
+
     if (metricCacheProvider == null && managementController != null) {
       metricCacheProvider = managementController.getTimelineMetricCacheProvider();
     }
+
+    if (metricPropertyProviderFactory == null && managementController != null) {
+      metricPropertyProviderFactory = managementController.getMetricPropertyProviderFactory();
+    }
   }
 
 
@@ -1090,7 +1102,8 @@ public abstract class AbstractProviderModule implements ProviderModule,
                                                      String componentNamePropertyId,
                                                      String statePropertyId) {
 
-    return new JMXPropertyProvider(PropertyHelper.getJMXPropertyIds(type), streamProvider,
+    return metricPropertyProviderFactory.createJMXPropertyProvider(
+        PropertyHelper.getJMXPropertyIds(type), streamProvider,
         jmxHostProvider, metricsHostProvider, clusterNamePropertyId, hostNamePropertyId,
         componentNamePropertyId, statePropertyId);
   }
@@ -1207,7 +1220,7 @@ public abstract class AbstractProviderModule implements ProviderModule,
   }
 
   private String getJMXProtocolString(String value) {
-    if (value.equals(PROPERTY_HDFS_HTTP_POLICY_VALUE_HTTPS_ONLY)) {
+    if (PROPERTY_HDFS_HTTP_POLICY_VALUE_HTTPS_ONLY.equals(value)) {
       return "https";
     } else {
       return "http";

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
index 6c40d14..f5040e5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
@@ -17,15 +17,25 @@
  */
 package org.apache.ambari.server.controller.internal;
 
-import com.google.inject.Inject;
-import com.google.inject.Injector;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
 import org.apache.ambari.server.controller.jmx.JMXHostProvider;
 import org.apache.ambari.server.controller.jmx.JMXPropertyProvider;
 import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory;
 import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider;
 import org.apache.ambari.server.controller.metrics.MetricsServiceProvider;
+import org.apache.ambari.server.controller.metrics.RestMetricsPropertyProvider;
 import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
 import org.apache.ambari.server.controller.spi.Predicate;
 import org.apache.ambari.server.controller.spi.PropertyProvider;
@@ -42,15 +52,8 @@ import org.apache.ambari.server.state.stack.MetricDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
 
 /**
  * This class analyzes a service's metrics to determine if additional
@@ -62,11 +65,20 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
 
   @Inject
   private static Clusters clusters = null;
+
   @Inject
   private static AmbariMetaInfo metaInfo = null;
+
   @Inject
   private static Injector injector = null;
 
+  /**
+   * A factory used to retrieve Guice-injected instances of a metric
+   * {@link PropertyProvider}.
+   */
+  @Inject
+  private static MetricPropertyProviderFactory metricPropertyProviderFactory;
+
   private Resource.Type type = null;
   private String clusterNamePropertyId = null;
   private String hostNamePropertyId = null;
@@ -92,6 +104,7 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
   public static void init(Injector injector) {
     clusters = injector.getInstance(Clusters.class);
     metaInfo = injector.getInstance(AmbariMetaInfo.class);
+    metricPropertyProviderFactory = injector.getInstance(MetricPropertyProviderFactory.class);
     StackDefinedPropertyProvider.injector = injector;
   }
 
@@ -108,12 +121,14 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
                                       PropertyProvider defaultGangliaPropertyProvider) {
 
     this.metricHostProvider = metricHostProvider;
-    this.metricsServiceProvider = serviceProvider;
+    metricsServiceProvider = serviceProvider;
 
-    if (null == clusterPropertyId)
+    if (null == clusterPropertyId) {
       throw new NullPointerException("Cluster name property id cannot be null");
-    if (null == componentPropertyId)
+    }
+    if (null == componentPropertyId) {
       throw new NullPointerException("Component name property id cannot be null");
+    }
 
     this.type = type;
 
@@ -152,8 +167,9 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
         List<MetricDefinition> defs = metaInfo.getMetrics(
             stack.getStackName(), stack.getStackVersion(), svc, componentName, type.name());
 
-        if (null == defs || 0 == defs.size())
+        if (null == defs || 0 == defs.size()) {
           continue;
+        }
 
         for (MetricDefinition m : defs) {
           if (m.getType().equals("ganglia")) {
@@ -192,7 +208,8 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
       }
 
       if (jmxMap.size() > 0) {
-        JMXPropertyProvider jpp = new JMXPropertyProvider(jmxMap, streamProvider,
+        JMXPropertyProvider jpp = metricPropertyProviderFactory.createJMXPropertyProvider(jmxMap,
+            streamProvider,
             jmxHostProvider, metricHostProvider,
             clusterNamePropertyId, hostNamePropertyId,
             componentNamePropertyId, resourceStatePropertyId);
@@ -308,12 +325,21 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
 
     try {
       Class<?> clz = Class.forName(definition.getType());
+
+      // use a Factory for the REST provider
+      if (clz.equals(RestMetricsPropertyProvider.class)) {
+        return metricPropertyProviderFactory.createRESTMetricsPropertyProvider(
+            definition.getProperties(), componentMetrics, streamProvider, metricsHostProvider,
+            clusterNamePropertyId, hostNamePropertyId, componentNamePropertyId, statePropertyId,
+            componentName);
+      }
+
       try {
          /*
          * Warning: this branch is already used, that's why please adjust
          * all implementations when modifying constructor interface
          */
-        Constructor<?> ct = clz.getConstructor(Injector.class, Map.class,
+        Constructor<?> ct = clz.getConstructor(Map.class,
             Map.class, StreamProvider.class, MetricHostProvider.class,
             String.class, String.class, String.class, String.class, String.class);
         Object o = ct.newInstance(

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
index 1ccc5df..e1a5e22 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
@@ -18,6 +18,19 @@
 
 package org.apache.ambari.server.controller.jmx;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.controller.internal.PropertyInfo;
 import org.apache.ambari.server.controller.metrics.MetricHostProvider;
@@ -27,26 +40,43 @@ import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.utilities.StreamProvider;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.codehaus.jackson.type.TypeReference;
+import org.apache.ambari.server.state.services.MetricsRetrievalService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
 
 /**
- * Property provider implementation for JMX sources.
+ * The {@link JMXPropertyProvider} is used to retrieve JMX metrics from a given
+ * {@link Request}. This class will delegate responsibility for actually
+ * retrieving JMX data from a remote URL to the {@link MetricsRetrievalService}.
+ * It will also leverage the {@link MetricsRetrievalService} to provide cached
+ * {@link JMXMetricHolder} instances for given URLs.
+ * <p/>
+ * This is because the REST API workflow will attempt to read data from this
+ * provider during the context of a live Jetty thread. As a result, any attempt
+ * to read remote resources will cause a delay in returning a response code. On
+ * small clusters this mormally isn't a problem. However, as the cluster
+ * increases in size, the thread pool would not be able to keep pace and would
+ * eventually cause REST API request threads to wait while remote JMX data is
+ * retrieved.
+ * <p/>
+ * In general, this type of federated data collection is a poor design. Even
+ * with a large enough threadpool there are simple use cases where the model
+ * breaks down:
+ * <ul>
+ * <li>Concurrent users logged in, each creating their own requests and
+ * exhausting the threadpool
+ * <li>Misbehaving JMX endpoints which don't respond in a timely manner
+ * </ul>
+ * <p/>
+ * For these reasons the {@link JMXPropertyProvider} will use a completely
+ * asynchronous model through the {@link MetricsRetrievalService}. It should be
+ * noted that this provider is still an instance of a
+ * {@link ThreadPoolEnabledPropertyProvider} due to the nature of how the cached
+ * {@link JMXMetricHolder} instances need to be looped over an parsed.
  */
 public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
 
@@ -54,9 +84,6 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
   private static final String PORT_KEY = "tag.port";
   private static final String DOT_REPLACEMENT_CHAR = "#";
 
-  private final static ObjectReader jmxObjectReader;
-  private final static ObjectReader stormObjectReader;
-
   private static final Map<String, String> DEFAULT_JMX_PORTS = new HashMap<String, String>();
 
   static {
@@ -69,16 +96,6 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
     DEFAULT_JMX_PORTS.put("NODEMANAGER",         "8042");
     DEFAULT_JMX_PORTS.put("JOURNALNODE",         "8480");
     DEFAULT_JMX_PORTS.put("STORM_REST_API",      "8745");
-
-    ObjectMapper jmxObjectMapper = new ObjectMapper();
-    jmxObjectMapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false);
-    jmxObjectReader = jmxObjectMapper.reader(JMXMetricHolder.class);
-
-    TypeReference<HashMap<String,Object>> typeRef
-            = new TypeReference<
-            HashMap<String,Object>
-            >() {};
-    stormObjectReader = jmxObjectMapper.reader(typeRef);
   }
 
   protected final static Logger LOG =
@@ -101,6 +118,13 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
 
   private final Map<String, String> clusterComponentPortsMap;
 
+  /**
+   * Used to submit asynchronous requests for remote metrics as well as querying
+   * cached metrics.
+   */
+  @Inject
+  private MetricsRetrievalService metricsRetrievalService;
+
   // ----- Constructors ------------------------------------------------------
 
   /**
@@ -115,14 +139,16 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
    * @param componentNamePropertyId  the component name property id
    * @param statePropertyId          the state property id
    */
-  public JMXPropertyProvider(Map<String, Map<String, PropertyInfo>> componentMetrics,
-                             StreamProvider streamProvider,
-                             JMXHostProvider jmxHostProvider,
-                             MetricHostProvider metricHostProvider,
-                             String clusterNamePropertyId,
-                             String hostNamePropertyId,
-                             String componentNamePropertyId,
-                             String statePropertyId) {
+  @AssistedInject
+  JMXPropertyProvider(
+      @Assisted("componentMetrics") Map<String, Map<String, PropertyInfo>> componentMetrics,
+      @Assisted("streamProvider") StreamProvider streamProvider,
+      @Assisted("jmxHostProvider") JMXHostProvider jmxHostProvider,
+      @Assisted("metricHostProvider") MetricHostProvider metricHostProvider,
+      @Assisted("clusterNamePropertyId") String clusterNamePropertyId,
+      @Assisted("hostNamePropertyId") @Nullable String hostNamePropertyId,
+      @Assisted("componentNamePropertyId") String componentNamePropertyId,
+      @Assisted("statePropertyId") @Nullable String statePropertyId) {
 
     super(componentMetrics, hostNamePropertyId, metricHostProvider, clusterNamePropertyId);
 
@@ -132,7 +158,7 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
     this.hostNamePropertyId       = hostNamePropertyId;
     this.componentNamePropertyId  = componentNamePropertyId;
     this.statePropertyId          = statePropertyId;
-    this.clusterComponentPortsMap = new HashMap<>();
+    clusterComponentPortsMap = new HashMap<>();
   }
 
   // ----- helper methods ----------------------------------------------------
@@ -174,6 +200,7 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
         unsupportedIds.add(id);
       }
     }
+
     ids.removeAll(unsupportedIds);
 
     if (ids.isEmpty()) {
@@ -205,53 +232,50 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
       return resource;
     }
 
-    InputStream in = null;
-
     String spec = null;
-    try {
+    for (String hostName : hostNames) {
       try {
-        for (String hostName : hostNames) {
-          try {
-            String port = getPort(clusterName, componentName, hostName, httpsEnabled);
-            if (port == null) {
-              LOG.warn("Unable to get JMX metrics.  No port value for " + componentName);
-              return resource;
-            }
-            spec = getSpec(protocol, hostName, port, "/jmx");
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Spec: " + spec);
-            }
-            in = streamProvider.readFrom(spec);
-            // if the ticket becomes invalid (timeout) then bail out
-            if (!ticket.isValid()) {
-              return resource;
-            }
+        String port = getPort(clusterName, componentName, hostName, httpsEnabled);
+        if (port == null) {
+          LOG.warn("Unable to get JMX metrics.  No port value for " + componentName);
+          return resource;
+        }
 
-            getHadoopMetricValue(in, ids, resource, request, ticket);
+        // build the URL
+        String jmxUrl = getSpec(protocol, hostName, port, "/jmx");
 
-          } catch (IOException e) {
-            AmbariException detailedException = new AmbariException(
-                String.format("Unable to get JMX metrics from the host %s for the component %s. Spec: %s", hostName, componentName, spec), e);
-            logException(detailedException);
-          }
+        // always submit a request to cache the latest data
+        metricsRetrievalService.submitJMXRequest(streamProvider, jmxUrl);
+
+        // check to see if there is a cached value and use it if there is
+        JMXMetricHolder jmxMetricHolder = metricsRetrievalService.getCachedJMXMetric(jmxUrl);
+        if (null == jmxMetricHolder) {
+          return resource;
         }
-      } finally {
-        if (in != null) {
-          in.close();
+
+        // if the ticket becomes invalid (timeout) then bail out
+        if (!ticket.isValid()) {
+          return resource;
         }
+
+        getHadoopMetricValue(jmxMetricHolder, ids, resource, request, ticket);
+
+      } catch (IOException e) {
+        AmbariException detailedException = new AmbariException(String.format(
+            "Unable to get JMX metrics from the host %s for the component %s. Spec: %s", hostName,
+            componentName, spec), e);
+        logException(detailedException);
       }
-    } catch (IOException e) {
-      logException(e);
     }
+
     return resource;
   }
 
   /**
    * Hadoop-specific metrics fetching
    */
-  private void getHadoopMetricValue(InputStream in, Set<String> ids,
+  private void getHadoopMetricValue(JMXMetricHolder metricHolder, Set<String> ids,
                        Resource resource, Request request, Ticket ticket) throws IOException {
-    JMXMetricHolder metricHolder = jmxObjectReader.readValue(in);
 
     Map<String, Map<String, Object>> categories = new HashMap<String, Map<String, Object>>();
     String componentName = (String) resource.getPropertyValue(componentNamePropertyId);
@@ -377,7 +401,7 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
   private String getJMXProtocol(String clusterName, String componentName) {
     return jmxHostProvider.getJMXProtocol(clusterName, componentName);
   }
-  
+
   private Set<String> getHosts(Resource resource, String clusterName, String componentName) {
     return hostNamePropertyId == null ?
             jmxHostProvider.getHostNames(clusterName, componentName) :

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricPropertyProviderFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricPropertyProviderFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricPropertyProviderFactory.java
new file mode 100644
index 0000000..0882ca2
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricPropertyProviderFactory.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.jmx.JMXHostProvider;
+import org.apache.ambari.server.controller.jmx.JMXPropertyProvider;
+import org.apache.ambari.server.controller.spi.PropertyProvider;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * The {@link MetricPropertyProviderFactory} is used to provide injected
+ * instances of {@link PropertyProvider}s which retrieve metrics.
+ */
+public interface MetricPropertyProviderFactory {
+
+  /**
+   * Gets a Guice-inject instance of a {@link JMXPropertyProvider}.
+   *
+   * @param componentMetrics
+   *          the map of supported metrics
+   * @param streamProvider
+   *          the stream provider
+   * @param jmxHostProvider
+   *          the JMX host mapping
+   * @param metricHostProvider
+   *          the host mapping
+   * @param clusterNamePropertyId
+   *          the cluster name property id
+   * @param hostNamePropertyId
+   *          the host name property id
+   * @param componentNamePropertyId
+   *          the component name property id
+   * @param statePropertyId
+   *          the state property id
+   * @return the instantiated and injected {@link JMXPropertyProvider}.
+   */
+  JMXPropertyProvider createJMXPropertyProvider(
+      @Assisted("componentMetrics") Map<String, Map<String, PropertyInfo>> componentMetrics,
+      @Assisted("streamProvider") StreamProvider streamProvider,
+      @Assisted("jmxHostProvider") JMXHostProvider jmxHostProvider,
+      @Assisted("metricHostProvider") MetricHostProvider metricHostProvider,
+      @Assisted("clusterNamePropertyId") String clusterNamePropertyId,
+      @Assisted("hostNamePropertyId") @Nullable String hostNamePropertyId,
+      @Assisted("componentNamePropertyId") String componentNamePropertyId,
+      @Assisted("statePropertyId") @Nullable String statePropertyId);
+
+  /**
+   * /** Create a REST property provider.
+   *
+   * @param metricsProperties
+   *          the map of per-component metrics properties
+   * @param componentMetrics
+   *          the map of supported metrics for component
+   * @param streamProvider
+   *          the stream provider
+   * @param metricHostProvider
+   *          metricsHostProvider instance
+   * @param clusterNamePropertyId
+   *          the cluster name property id
+   * @param hostNamePropertyId
+   *          the host name property id, or {@code null} if none.
+   * @param componentNamePropertyId
+   *          the component name property id
+   * @param statePropertyId
+   *          the state property id or {@code null} if none.
+   * @param componentName
+   *          the name of the component which the metric is for, or {@code null}
+   *          if none.
+   * @return the instantiated and injected {@link RestMetricsPropertyProvider}.
+   */
+  RestMetricsPropertyProvider createRESTMetricsPropertyProvider(
+      @Assisted("metricsProperties") Map<String, String> metricsProperties,
+      @Assisted("componentMetrics") Map<String, Map<String, PropertyInfo>> componentMetrics,
+      @Assisted("streamProvider") StreamProvider streamProvider,
+      @Assisted("metricHostProvider") MetricHostProvider metricHostProvider,
+      @Assisted("clusterNamePropertyId") String clusterNamePropertyId,
+      @Assisted("hostNamePropertyId") @Nullable String hostNamePropertyId,
+      @Assisted("componentNamePropertyId") String componentNamePropertyId,
+      @Assisted("statePropertyId") @Nullable String statePropertyId,
+      @Assisted("componentName") @Nullable String componentName);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java
index 6f2a134..cbe827a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java
@@ -19,12 +19,15 @@
 
 package org.apache.ambari.server.controller.metrics;
 
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.reflect.TypeToken;
-import com.google.gson.stream.JsonReader;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.internal.PropertyInfo;
@@ -36,48 +39,63 @@ import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.utilities.StreamProvider;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.services.MetricsRetrievalService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Map;
-import java.util.Set;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.reflect.TypeToken;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
 
 /**
- * WARNING: Class should be thread-safe!
+ * Resolves metrics like api/cluster/summary/nimbus.uptime For every metric,
+ * finds a relevant JSON value and returns it as a resource property.
+ * <p/>
+ * This class will delegate responsibility for actually retrieving JSON data
+ * from a remote URL to the {@link MetricsRetrievalService}. It will also
+ * leverage the {@link MetricsRetrievalService} to provide cached {@link Map}
+ * instances for given URLs.
  * <p/>
- * Resolves metrics like api/cluster/summary/nimbus.uptime
- * For every metric, finds a relevant JSON value and returns is as
- * a resource property.
+ * This is because the REST API workflow will attempt to read data from this
+ * provider during the context of a live Jetty thread. As a result, any attempt
+ * to read remote resources will cause a delay in returning a response code. On
+ * small clusters this mormally isn't a problem. However, as the cluster
+ * increases in size, the thread pool would not be able to keep pace and would
+ * eventually cause REST API request threads to wait while remote JSON data is
+ * retrieved.
  */
 public class RestMetricsPropertyProvider extends ThreadPoolEnabledPropertyProvider {
 
   protected final static Logger LOG =
       LoggerFactory.getLogger(RestMetricsPropertyProvider.class);
 
-  private static Map<String, RestMetricsPropertyProvider> instances =
-      new Hashtable<String, RestMetricsPropertyProvider>();
-
   @Inject
   private AmbariManagementController amc;
 
   @Inject
   private Clusters clusters;
 
+  /**
+   * Used to parse the REST JSON metrics.
+   */
+  @Inject
+  private Gson gson;
+
+  /**
+   * Used to submit asynchronous requests for remote metrics as well as querying
+   * cached metrics.
+   */
+  @Inject
+  private MetricsRetrievalService metricsRetrievalService;
+
   private final Map<String, String> metricsProperties;
   private final StreamProvider streamProvider;
   private final String clusterNamePropertyId;
   private final String componentNamePropertyId;
   private final String statePropertyId;
-  private MetricHostProvider metricHostProvider;
   private final String componentName;
 
   private static final String DEFAULT_PORT_PROPERTY = "default_port";
@@ -116,26 +134,23 @@ public class RestMetricsPropertyProvider extends ThreadPoolEnabledPropertyProvid
    * @param componentNamePropertyId the component name property id
    * @param statePropertyId         the state property id
    */
-  public RestMetricsPropertyProvider(
-    Injector injector,
-    Map<String, String> metricsProperties,
-    Map<String, Map<String, PropertyInfo>> componentMetrics,
-    StreamProvider streamProvider,
-    MetricHostProvider metricHostProvider,
-    String clusterNamePropertyId,
-    String hostNamePropertyId,
-    String componentNamePropertyId,
-    String statePropertyId,
-    String componentName){
-
+  @AssistedInject
+  RestMetricsPropertyProvider(
+      @Assisted("metricsProperties") Map<String, String> metricsProperties,
+      @Assisted("componentMetrics") Map<String, Map<String, PropertyInfo>> componentMetrics,
+      @Assisted("streamProvider") StreamProvider streamProvider,
+      @Assisted("metricHostProvider") MetricHostProvider metricHostProvider,
+      @Assisted("clusterNamePropertyId") String clusterNamePropertyId,
+      @Assisted("hostNamePropertyId") @Nullable String hostNamePropertyId,
+      @Assisted("componentNamePropertyId") String componentNamePropertyId,
+      @Assisted("statePropertyId") @Nullable String statePropertyId,
+      @Assisted("componentName") @Nullable String componentName) {
     super(componentMetrics, hostNamePropertyId, metricHostProvider, clusterNamePropertyId);
     this.metricsProperties = metricsProperties;
     this.streamProvider = streamProvider;
     this.clusterNamePropertyId = clusterNamePropertyId;
     this.componentNamePropertyId = componentNamePropertyId;
     this.statePropertyId = statePropertyId;
-    this.metricHostProvider = metricHostProvider;
-    injector.injectMembers(this);
     this.componentName = componentName;
   }
 
@@ -223,24 +238,28 @@ public class RestMetricsPropertyProvider extends ThreadPoolEnabledPropertyProvid
     HashMap<String, Set<String>> urls = extractPropertyURLs(resultIds, propertyInfos);
 
     for (String url : urls.keySet()) {
-      String spec = null;
+      String spec = getSpec(protocol, hostname, port, url);
+
+      // always submit a request to cache the latest data
+      metricsRetrievalService.submitRESTRequest(streamProvider, spec);
+
+      // check to see if there is a cached value and use it if there is
+      Map<String, String> jsonMap = metricsRetrievalService.getCachedRESTMetric(spec);
+      if (null == jsonMap) {
+        return resource;
+      }
+
+      if (!ticket.isValid()) {
+        return resource;
+      }
+
       try {
-        spec = getSpec(protocol, hostname, port, url);
-        InputStream in = streamProvider.readFrom(spec);
-        if (!ticket.isValid()) {
-          if (in != null) {
-            in.close();
-          }
-          return resource;
-        }       
-        try {
-          extractValuesFromJSON(in, urls.get(url), resource, propertyInfos);
-        } finally {
-            in.close();
-          }
-      } catch (IOException e) {
-        AmbariException detailedException = new AmbariException(
-            String.format("Unable to get REST metrics from the host %s for the component %s. Spec: %s", hostname, resourceComponentName, spec), e);
+        extractValuesFromJSON(jsonMap, urls.get(url), resource, propertyInfos);
+      } catch (AmbariException ambariException) {
+        AmbariException detailedException = new AmbariException(String.format(
+            "Unable to get REST metrics from the for %s at $s", resourceComponentName, spec),
+            ambariException);
+
         logException(detailedException);
       }
     }
@@ -418,25 +437,17 @@ public class RestMetricsPropertyProvider extends ThreadPoolEnabledPropertyProvid
 
 
   /**
-   * Extracts requested properties from a given JSON input stream into
-   * resource.
+   * Extracts requested properties from a parsed {@link Map} of {@link String}.
    *
-   * @param jsonStream           input stream that contains JSON
-   * @param requestedPropertyIds a set of property IDs
-   *                             that should be fetched for this URL
-   * @param resource             all extracted values are placed into resource
+   * @param requestedPropertyIds
+   *          a set of property IDs that should be fetched for this URL
+   * @param resource
+   *          all extracted values are placed into resource
    */
-  private void extractValuesFromJSON(InputStream jsonStream,
-                                     Set<String> requestedPropertyIds,
-                                     Resource resource,
-                                     Map<String, PropertyInfo> propertyInfos)
-      throws IOException {
-    Gson gson = new Gson();
-    Type type = new TypeToken<Map<Object, Object>>() {
-    }.getType();
-    JsonReader jsonReader = new JsonReader(
-        new BufferedReader(new InputStreamReader(jsonStream)));
-    Map<String, String> jsonMap = gson.fromJson(jsonReader, type);
+  private void extractValuesFromJSON(Map<String, String> jsonMap,
+      Set<String> requestedPropertyIds, Resource resource, Map<String, PropertyInfo> propertyInfos)
+      throws AmbariException {
+    Type type = new TypeToken<Map<Object, Object>>() {}.getType();
     for (String requestedPropertyId : requestedPropertyIds) {
       PropertyInfo propertyInfo = propertyInfos.get(requestedPropertyId);
       String metricsPath = propertyInfo.getPropertyId();
@@ -450,7 +461,8 @@ public class RestMetricsPropertyProvider extends ThreadPoolEnabledPropertyProvid
               "Can not fetch %dth element of document path (%s) " +
                   "from json. Wrong metrics path: %s",
               i, pathElement, metricsPath);
-          throw new IOException(message);
+
+          throw new AmbariException(message);
         }
         Object jsonSubElement = jsonMap.get(pathElement);
         if (i == docPath.length - 1) { // Reached target document section

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java
index 6f4a6ea..367c339 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java
@@ -18,19 +18,6 @@
 
 package org.apache.ambari.server.controller.metrics;
 
-import com.google.common.base.Throwables;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import org.apache.ambari.server.configuration.Configuration;
-import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
-import org.apache.ambari.server.controller.internal.PropertyInfo;
-import org.apache.ambari.server.controller.spi.Predicate;
-import org.apache.ambari.server.controller.spi.Request;
-import org.apache.ambari.server.controller.spi.Resource;
-import org.apache.ambari.server.controller.spi.SystemException;
-import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor;
-import org.apache.ambari.server.controller.utilities.BufferedThreadPoolExecutorCompletionService;
-
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
@@ -39,17 +26,57 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import javax.inject.Inject;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.jmx.JMXPropertyProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.PropertyProvider;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.utilities.BufferedThreadPoolExecutorCompletionService;
+import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor;
+
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
 
 /**
- * Unites common functionality for multithreaded metrics providers
- * (JMX and REST as of now). Shares the same pool of executor threads.
+ * Unites common functionality for multithreaded metrics providers (JMX and REST
+ * as of now). Shares the same pool of executor threads across all
+ * implementations.
+ * <p/>
+ * <b>This {@link PropertyProvider} should not be mistaken for a way to perform
+ * expensive operations, as it is still called as part of the incoming REST
+ * Jetty request.</b> It is poor design to have UI threads from the web client
+ * waiting on expensive operations from a {@link PropertyProvider}, even if they
+ * are spread across multiple threads.
+ * <p/>
+ * Instead, this {@link PropertyProvider} is useful for spreading many small,
+ * quick operations across a threadpool. This is why the known implementations
+ * of this class (such as the {@link JMXPropertyProvider}) use a cache instead
+ * of reaching out to network endpoints on their own.
+ * <p/>
+ * This is also why the {@link ThreadPoolExecutor} used here has an unbounded
+ * worker queue and essentially a fixed core size to perform its work. When
+ * {@link Callable}s are rejected because of a worker queue exhaustion, they are
+ * never submitted for execution, yet the {@link Future} instance is still
+ * returned. Therefore, if the queue is ever exhausted, incoming REST API
+ * requests must wait the entire {@link CompletionService#poll(long, TimeUnit)}
+ * timeout before skipping the result and returning control.
+ *
  */
 public abstract class ThreadPoolEnabledPropertyProvider extends AbstractPropertyProvider {
 
+  protected static Configuration configuration;
+
   /**
    * Host states that make available metrics collection
    */
@@ -59,29 +86,24 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty
   private final String clusterNamePropertyId;
 
   /**
-   * Executor service is shared between all childs of current class
+   * Executor service is shared between all instances.
    */
   private static ThreadPoolExecutor EXECUTOR_SERVICE;
   private static int THREAD_POOL_CORE_SIZE;
   private static int THREAD_POOL_MAX_SIZE;
+  private static int THREAD_POOL_WORKER_QUEUE_SIZE;
+  private static long COMPLETION_SERVICE_POLL_TIMEOUT;
   private static final long THREAD_POOL_TIMEOUT_MILLIS = 30000L;
 
   @Inject
   public static void init(Configuration configuration) {
     THREAD_POOL_CORE_SIZE = configuration.getPropertyProvidersThreadPoolCoreSize();
     THREAD_POOL_MAX_SIZE = configuration.getPropertyProvidersThreadPoolMaxSize();
+    THREAD_POOL_WORKER_QUEUE_SIZE = configuration.getPropertyProvidersWorkerQueueSize();
+    COMPLETION_SERVICE_POLL_TIMEOUT = configuration.getPropertyProvidersCompletionServiceTimeout();
     EXECUTOR_SERVICE = initExecutorService();
   }
 
-  private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 10000L;
-  /**
-   * The amount of time that this provider will wait for JMX metric values to be
-   * returned from the JMX sources.  If no results are returned for this amount of
-   * time then the request to populate the resources will fail.
-   */
-  protected long populateTimeout = DEFAULT_POPULATE_TIMEOUT_MILLIS;
-  public static final String TIMED_OUT_MSG = "Timed out waiting for metrics.";
-
   private static final Cache<String, Throwable> exceptionsCache = CacheBuilder.newBuilder()
           .expireAfterWrite(5, TimeUnit.MINUTES)
           .build();
@@ -114,8 +136,15 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty
             THREAD_POOL_CORE_SIZE,
             THREAD_POOL_MAX_SIZE,
             THREAD_POOL_TIMEOUT_MILLIS,
-            TimeUnit.MILLISECONDS);
+            TimeUnit.MILLISECONDS,
+            THREAD_POOL_WORKER_QUEUE_SIZE);
     threadPoolExecutor.allowCoreThreadTimeOut(true);
+
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(
+        "ambari-property-provider-thread-%d").build();
+
+    threadPoolExecutor.setThreadFactory(threadFactory);
+
     return threadPoolExecutor;
   }
 
@@ -128,36 +157,47 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty
     if(!checkAuthorizationForMetrics(resources, clusterNamePropertyId)) {
       return resources;
     }
+
     // Get a valid ticket for the request.
-    Ticket ticket = new Ticket();
+    final Ticket ticket = new Ticket();
 
-    CompletionService<Resource> completionService =
+    // in most cases, the buffered completion service will not be utlized for
+    // its advantages since the worker queue is unbounded. However, if is
+    // configured with a boundary, then the buffered service ensures that no
+    // requests are discarded.
+    final CompletionService<Resource> completionService =
         new BufferedThreadPoolExecutorCompletionService<Resource>(EXECUTOR_SERVICE);
 
     // In a large cluster we could have thousands of resources to populate here.
     // Distribute the work across multiple threads.
     for (Resource resource : resources) {
-      completionService.submit(getPopulateResourceCallable(resource, request, predicate, ticket));
+      completionService.submit(
+          getPopulateResourceCallable(resource, request, predicate, ticket));
     }
 
     Set<Resource> keepers = new HashSet<Resource>();
     try {
-      for (int i = 0; i < resources.size(); ++ i) {
-        Future<Resource> resourceFuture =
-            completionService.poll(populateTimeout, TimeUnit.MILLISECONDS);
+      for (int i = 0; i < resources.size(); ++i) {
+        Future<Resource> resourceFuture = completionService.poll(COMPLETION_SERVICE_POLL_TIMEOUT,
+            TimeUnit.MILLISECONDS);
 
         if (resourceFuture == null) {
-          // its been more than the populateTimeout since the last callable completed ...
-          // invalidate the ticket to abort the threads and don't wait any longer
+          // its been more than the populateTimeout since the last callable
+          // completed ...
+          // invalidate the ticket to abort the threads and don't wait any
+          // longer
           ticket.invalidate();
-          LOG.error(TIMED_OUT_MSG);
+          LOG.error("Timed out after waiting {}ms waiting for request {}",
+              COMPLETION_SERVICE_POLL_TIMEOUT, request);
+
+          // stop iterating
           break;
-        } else {
-          // future should already be completed... no need to wait on get
-          Resource resource = resourceFuture.get();
-          if (resource != null) {
-            keepers.add(resource);
-          }
+        }
+
+        // future should already be completed... no need to wait on get
+        Resource resource = resourceFuture.get();
+        if (resource != null) {
+          keepers.add(resource);
         }
       }
     } catch (InterruptedException e) {
@@ -165,6 +205,7 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty
     } catch (ExecutionException e) {
       rethrowSystemException(e.getCause());
     }
+
     return keepers;
   }
 
@@ -181,6 +222,7 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty
   private Callable<Resource> getPopulateResourceCallable(
       final Resource resource, final Request request, final Predicate predicate, final Ticket ticket) {
     return new Callable<Resource>() {
+      @Override
       public Resource call() throws SystemException {
         return populateResource(resource, request, predicate, ticket);
       }
@@ -211,8 +253,7 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty
 
 
   protected void setPopulateTimeout(long populateTimeout) {
-    this.populateTimeout = populateTimeout;
-
+    COMPLETION_SERVICE_POLL_TIMEOUT = populateTimeout;
   }
 
 
@@ -247,7 +288,7 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty
    * @return the error message that was logged
    */
   protected static String logException(final Throwable throwable) {
-    final String msg = "Caught exception getting JMX metrics : " + throwable.getLocalizedMessage();
+    final String msg = "Caught exception getting metrics : " + throwable.getLocalizedMessage();
 
     // JsonParseException includes InputStream's hash code into the message.
     // getMessage and printStackTrace returns a different String every time.

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java
index 4d6daa6..9f31fef 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java
@@ -30,13 +30,18 @@ import java.util.concurrent.TimeUnit;
  * and uses its thread pool to execute tasks - buffering any tasks that
  * overflow. Such buffered tasks are later re-submitted to the executor when
  * finished tasks are polled or taken.
- * 
+ * <p/>
  * This class overrides the {@link ThreadPoolExecutor}'s
- * {@link RejectedExecutionHandler} to collect overflowing tasks.
- * 
+ * {@link RejectedExecutionHandler} to collect overflowing tasks. When
+ * {@link #poll(long, TimeUnit)} is invoked, this class will attempt to
+ * re-submit any overflow tasks before waiting the specified amount of time.
+ * This will prevent blocking on an empty completion queue since the parent
+ * {@link ExecutorCompletionService} doesn't have any idea that there are tasks
+ * waiting to be resubmitted.
+ * <p/>
  * The {@link ScalingThreadPoolExecutor} can be used in conjunction with this
  * class to provide an efficient buffered, scaling thread pool implementation.
- * 
+ *
  * @param <V>
  */
 public class BufferedThreadPoolExecutorCompletionService<V> extends ExecutorCompletionService<V> {
@@ -84,15 +89,50 @@ public class BufferedThreadPoolExecutorCompletionService<V> extends ExecutorComp
     return poll;
   }
 
+  /**
+   * {@inheritDoc}
+   * <p/>
+   * The goal of this method is to prevent blocking if there are tasks waiting
+   * in the overflow queue. In the event that the overflow queue was populated,
+   * we should not blindly wait on the parent
+   * {@link ExecutorCompletionService#poll(long, TimeUnit)} method. Instead, we
+   * should ensure that we have submitted at least one of our own tasks for
+   * completion.
+   */
   @Override
   public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
-    Future<V> poll = super.poll(timeout, unit);
-    if (!executor.isTerminating() && !overflowQueue.isEmpty() && executor.getActiveCount() < executor.getMaximumPoolSize()) {
-      Runnable overflow = overflowQueue.poll();
-      if (overflow != null) {
-        executor.execute(overflow);
+    // first poll for anything that's already completed and do a short-circuit return
+    Future<V> poll = super.poll();
+    if( null != poll ) {
+      // there's something to return; that's great, but let's also see if we can
+      // submit anything in the overflow queue back to the completion service
+      if (!executor.isTerminating() && !overflowQueue.isEmpty()
+          && executor.getActiveCount() < executor.getMaximumPoolSize()) {
+        Runnable overflow = overflowQueue.poll();
+        if (overflow != null) {
+          executor.execute(overflow);
+        }
       }
+
+      // return the future
+      return poll;
     }
+
+    // nothing completed yet, so check active thread count - if there is nothing
+    // working either, then that means that the parent completion service thinks
+    // it's done - we should submit our own tasks
+    if (executor.getActiveCount() == 0) {
+      if (!executor.isTerminating() && !overflowQueue.isEmpty()) {
+        Runnable overflow = overflowQueue.poll();
+        if (overflow != null) {
+          executor.execute(overflow);
+        }
+      }
+    }
+
+    // now that we've confirmed that either the parent completion service is
+    // still working or we submitted our own task, we can poll with a timeout
+    poll = super.poll(timeout, unit);
     return poll;
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java
index 7a5e479..864e3fb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java
@@ -28,21 +28,45 @@ import java.util.concurrent.TimeUnit;
  * possibly several pooled threads. It also scales up the number of threads in
  * the pool if the number of submissions exceeds the core size of the pool. The
  * pool can scale up to the specified maximum pool size.
- * 
+ *
  * If the number of submissions exceeds the sum of the core and maximum size of
  * the thread pool, the submissions are then handled by the provided
  * {@link RejectedExecutionHandler}.
- * 
+ *
  * If the overflowing submissions need to be handled,
  * {@link BufferedThreadPoolExecutorCompletionService} can be used to buffer up
  * overflowing submissions for later submission as threads become available.
- * 
+ *
  * @see BufferedThreadPoolExecutorCompletionService
  */
 public class ScalingThreadPoolExecutor extends ThreadPoolExecutor {
 
-  public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
-    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(corePoolSize));
+  /**
+   * Constructor.
+   *
+   * @param corePoolSize
+   *          the number of threads which will be considered as "core". If core
+   *          thread timeout is not enabled, then this will be the minimum
+   *          number of threads, always. With core timeout enabled, then this
+   *          will be the number of threads spun up to handle incoming work
+   *          regardless of worker queue size.
+   * @param maximumPoolSize
+   *          the maximum number of threads which can be spawned when an
+   *          {@link ExecutorService} encounters a failure inserting into the
+   *          work queue. These threads are not spawned to handle entries into
+   *          the work queue; they are only spawned if the queue fills and a
+   *          {@link RejectedExecutionHandler} is invoked.
+   * @param keepAliveTime
+   *          the TTL for core threads
+   * @param unit
+   *          the time unit for core threads
+   * @param workerQueueSize
+   *          the size of the worker queue. The threads specified by
+   *          {@link #getMaximumPoolSize()} will only be created if the worker
+   *          queue is exhausted.
+   */
+  public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
+      TimeUnit unit, int workerQueueSize) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(workerQueueSize));
   }
-
 }


Mime
View raw message