ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srima...@apache.org
Subject ambari git commit: AMBARI-14699. JMX heatmap API call takes over 1 minute to complete on 900 node cluster (srimanth)
Date Fri, 22 Jan 2016 18:42:18 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk f2a44a7e1 -> 5acbc4d83


AMBARI-14699. JMX heatmap API call takes over 1 minute to complete on 900 node cluster (srimanth)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5acbc4d8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5acbc4d8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5acbc4d8

Branch: refs/heads/trunk
Commit: 5acbc4d832b0c8b14d76578c783bfb8f94523b15
Parents: f2a44a7
Author: Srimanth Gunturi <sgunturi@hortonworks.com>
Authored: Fri Jan 15 15:18:20 2016 -0800
Committer: Srimanth Gunturi <sgunturi@hortonworks.com>
Committed: Fri Jan 22 10:41:42 2016 -0800

----------------------------------------------------------------------
 .../server/configuration/Configuration.java     |  26 +++
 .../ambari/server/controller/AmbariServer.java  |   2 +
 .../controller/jmx/JMXPropertyProvider.java     |  19 +-
 .../ThreadPoolEnabledPropertyProvider.java      |  40 ++--
 .../metrics/timeline/AMSPropertyProvider.java   |  20 +-
 ...eredThreadPoolExecutorCompletionService.java |  98 ++++++++++
 .../utilities/ScalingThreadPoolExecutor.java    |  48 +++++
 .../server/configuration/ConfigurationTest.java |  15 ++
 .../StackDefinedPropertyProviderTest.java       |   4 +
 .../metrics/JMXPropertyProviderTest.java        |  12 ++
 .../RestMetricsPropertyProviderTest.java        |   4 +
 ...ThreadPoolExecutorCompletionServiceTest.java | 188 +++++++++++++++++++
 12 files changed, 451 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index c31a119..4120f34 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+
 import org.apache.ambari.annotations.Experimental;
 import org.apache.ambari.annotations.ExperimentalFeature;
 import org.apache.ambari.server.AmbariException;
@@ -436,6 +437,11 @@ public class Configuration {
   private static final String VIEW_EXTRACTION_THREADPOOL_TIMEOUT_KEY = "view.extraction.threadpool.timeout";
   private static final long VIEW_EXTRACTION_THREADPOOL_TIMEOUT_DEFAULT = 100000L;
 
+  public static final String PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_KEY = "server.property-provider.threadpool.size.max";
+  public static final int PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_DEFAULT = 4 * Runtime.getRuntime().availableProcessors();
+  public static final String PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_KEY = "server.property-provider.threadpool.size.core";
+  public static final int PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_DEFAULT = 2 * Runtime.getRuntime().availableProcessors();
+
   private static final String SERVER_HTTP_SESSION_INACTIVE_TIMEOUT = "server.http.session.inactive_timeout";
 
   // database pooling defaults
@@ -1920,6 +1926,26 @@ public class Configuration {
   }
 
   /**
+   * Get property-providers' thread pool core size.
+   * 
+   * @return the property-providers' thread pool core size
+   */
+  public int getPropertyProvidersThreadPoolCoreSize() {
+    return Integer.parseInt(properties.getProperty(PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_KEY,
+        String.valueOf(PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_DEFAULT)));
+  }
+
+  /**
+   * Get property-providers' thread pool max size.
+   * 
+   * @return the property-providers' thread pool max size
+   */
+  public int getPropertyProvidersThreadPoolMaxSize() {
+    return Integer.parseInt(properties.getProperty(PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_KEY,
+        String.valueOf(PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_DEFAULT)));
+  }
+
+  /**
    * Get the view extraction thread pool timeout.
    *
    * @return the view extraction thread pool timeout

http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index 02a2d57..43e2dac 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -64,6 +64,7 @@ import org.apache.ambari.server.controller.internal.StackDefinedPropertyProvider
 import org.apache.ambari.server.controller.internal.StackDependencyResourceProvider;
 import org.apache.ambari.server.controller.internal.UserPrivilegeResourceProvider;
 import org.apache.ambari.server.controller.internal.ViewPermissionResourceProvider;
+import org.apache.ambari.server.controller.metrics.ThreadPoolEnabledPropertyProvider;
 import org.apache.ambari.server.controller.utilities.DatabaseChecker;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.PersistenceType;
@@ -837,6 +838,7 @@ public class AmbariServer {
     AmbariPrivilegeResourceProvider.init(injector.getInstance(ClusterDAO.class));
     ActionManager.setTopologyManager(injector.getInstance(TopologyManager.class));
     StackAdvisorBlueprintProcessor.init(injector.getInstance(StackAdvisorHelper.class));
+    ThreadPoolEnabledPropertyProvider.init(injector.getInstance(Configuration.class));
 
     RetryHelper.init(configs.getOperationsRetryAttempts());
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
index 2079e72..f8215f6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
@@ -98,6 +98,8 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider
{
 
   private final String statePropertyId;
 
+  private final Map<String, String> clusterComponentPortsMap;
+
   // ----- Constructors ------------------------------------------------------
 
   /**
@@ -129,10 +131,17 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider
{
     this.hostNamePropertyId       = hostNamePropertyId;
     this.componentNamePropertyId  = componentNamePropertyId;
     this.statePropertyId          = statePropertyId;
+    this.clusterComponentPortsMap = new HashMap<>();
   }
 
   // ----- helper methods ----------------------------------------------------
 
+  @Override
+  public Set<Resource> populateResources(Set<Resource> resources, Request request,
Predicate predicate) throws SystemException {
+    clusterComponentPortsMap.clear();
+    return super.populateResources(resources, request, predicate);
+  }
+
   /**
    * Populate a resource by obtaining the requested JMX properties.
    *
@@ -349,8 +358,14 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider
{
   }
 
   private String getPort(String clusterName, String componentName, boolean httpsEnabled)
throws SystemException {
-    String port = jmxHostProvider.getPort(clusterName, componentName, httpsEnabled);
-    return port == null ? DEFAULT_JMX_PORTS.get(componentName) : port;
+    String portMapKey = String.format("%s-%s-%s", clusterName, componentName, httpsEnabled);
+    String port = clusterComponentPortsMap.get(portMapKey);
+    if (port==null) {
+      port = jmxHostProvider.getPort(clusterName, componentName, httpsEnabled);
+      port = port == null ? DEFAULT_JMX_PORTS.get(componentName) : port;
+      clusterComponentPortsMap.put(portMapKey, port);
+    }
+    return port;
   }
 
   private String getJMXProtocol(String clusterName, String componentName) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java
index 1e961a6..0e7a1bd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ThreadPoolEnabledPropertyProvider.java
@@ -18,12 +18,15 @@
 
 package org.apache.ambari.server.controller.metrics;
 
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
 import org.apache.ambari.server.controller.internal.PropertyInfo;
 import org.apache.ambari.server.controller.spi.Predicate;
 import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor;
+import org.apache.ambari.server.controller.utilities.BufferedThreadPoolExecutorCompletionService;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -32,13 +35,12 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import javax.inject.Inject;
+
 /**
  * Unites common functionality for multithreaded metrics providers
  * (JMX and REST as of now). Shares the same pool of executor threads.
@@ -56,11 +58,18 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty
   /**
    * Executor service is shared between all childs of current class
    */
-  private static final ExecutorService EXECUTOR_SERVICE = initExecutorService();
-  private static final int THREAD_POOL_CORE_SIZE = 20;
-  private static final int THREAD_POOL_MAX_SIZE = 100;
+  private static ThreadPoolExecutor EXECUTOR_SERVICE;
+  private static int THREAD_POOL_CORE_SIZE;
+  private static int THREAD_POOL_MAX_SIZE;
   private static final long THREAD_POOL_TIMEOUT_MILLIS = 30000L;
 
+  @Inject
+  public static void init(Configuration configuration) {
+    THREAD_POOL_CORE_SIZE = configuration.getPropertyProvidersThreadPoolCoreSize();
+    THREAD_POOL_MAX_SIZE = configuration.getPropertyProvidersThreadPoolMaxSize();
+    EXECUTOR_SERVICE = initExecutorService();
+  }
+
   private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 10000L;
   /**
    * The amount of time that this provider will wait for JMX metric values to be
@@ -92,28 +101,17 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty
   /**
    * Generates thread pool with default parameters
    */
-
-
-  private static ExecutorService initExecutorService() {
-    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
// unlimited Queue
-
+  private static ThreadPoolExecutor initExecutorService() {
     ThreadPoolExecutor threadPoolExecutor =
-        new ThreadPoolExecutor(
+        new ScalingThreadPoolExecutor(
             THREAD_POOL_CORE_SIZE,
             THREAD_POOL_MAX_SIZE,
             THREAD_POOL_TIMEOUT_MILLIS,
-            TimeUnit.MILLISECONDS,
-            queue);
-
+            TimeUnit.MILLISECONDS);
     threadPoolExecutor.allowCoreThreadTimeOut(true);
-
     return threadPoolExecutor;
   }
 
-  public static ExecutorService getExecutorService() {
-    return EXECUTOR_SERVICE;
-  }
-
   // ----- Common PropertyProvider implementation details --------------------
 
   @Override
@@ -127,7 +125,7 @@ public abstract class ThreadPoolEnabledPropertyProvider extends AbstractProperty
     Ticket ticket = new Ticket();
 
     CompletionService<Resource> completionService =
-        new ExecutorCompletionService<Resource>(EXECUTOR_SERVICE);
+        new BufferedThreadPoolExecutorCompletionService<Resource>(EXECUTOR_SERVICE);
 
     // In a large cluster we could have thousands of resources to populate here.
     // Distribute the work across multiple threads.

http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
index acbf1cf..3ecb520 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
@@ -532,6 +532,8 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider
{
 
     String collectorHostName = null;
     String collectorPort = null;
+    Map<String, Boolean> clusterCollectorComponentLiveMap = new HashMap<>();
+    Map<String, Boolean> clusterCollectorHostLiveMap = new HashMap<>();
 
     for (Resource resource : resources) {
       String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
@@ -542,7 +544,14 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider
{
       }
 
       // Check liveliness of host
-      if (!hostProvider.isCollectorHostLive(clusterName, TIMELINE_METRICS)) {
+      boolean clusterCollectorHostLive;
+      if (clusterCollectorHostLiveMap.containsKey(clusterName)) {
+        clusterCollectorHostLive = clusterCollectorHostLiveMap.get(clusterName);
+      } else {
+        clusterCollectorHostLive = hostProvider.isCollectorComponentLive(clusterName, TIMELINE_METRICS);
+        clusterCollectorHostLiveMap.put(clusterName, clusterCollectorHostLive);
+      }
+      if (!clusterCollectorHostLive) {
         if (printSkipPopulateMsgHostCounter.getAndIncrement() == 0) {
           LOG.info("METRICS_COLLECTOR host is not live. Skip populating " +
             "resources with metrics, next message will be logged after 1000 " +
@@ -556,7 +565,14 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider
{
       printSkipPopulateMsgHostCounter.set(0);
 
       // Check liveliness of Collector
-      if (!hostProvider.isCollectorComponentLive(clusterName, TIMELINE_METRICS)) {
+      boolean clusterCollectorComponentLive;
+      if (clusterCollectorComponentLiveMap.containsKey(clusterName)) {
+        clusterCollectorComponentLive = clusterCollectorComponentLiveMap.get(clusterName);
+      } else {
+        clusterCollectorComponentLive = hostProvider.isCollectorComponentLive(clusterName,
TIMELINE_METRICS);
+        clusterCollectorComponentLiveMap.put(clusterName, clusterCollectorComponentLive);
+      }
+      if (!clusterCollectorComponentLive) {
         if (printSkipPopulateMsgHostCompCounter.getAndIncrement() == 0) {
           LOG.info("METRICS_COLLECTOR is not live. Skip populating resources " +
             "with metrics., next message will be logged after 1000 " +

http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java
new file mode 100644
index 0000000..4d6daa6
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/BufferedThreadPoolExecutorCompletionService.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.utilities;
+
+import java.util.Queue;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An {@link ExecutorCompletionService} which takes a {@link ThreadPoolExecutor}
+ * and uses its thread pool to execute tasks - buffering any tasks that
+ * overflow. Such buffered tasks are later re-submitted to the executor when
+ * finished tasks are polled or taken.
+ * 
+ * This class overrides the {@link ThreadPoolExecutor}'s
+ * {@link RejectedExecutionHandler} to collect overflowing tasks.
+ * 
+ * The {@link ScalingThreadPoolExecutor} can be used in conjunction with this
+ * class to provide an efficient buffered, scaling thread pool implementation.
+ * 
+ * @param <V>
+ */
+public class BufferedThreadPoolExecutorCompletionService<V> extends ExecutorCompletionService<V>
{
+
+  private ThreadPoolExecutor executor;
+  private Queue<Runnable> overflowQueue;
+
+  public BufferedThreadPoolExecutorCompletionService(ThreadPoolExecutor executor) {
+    super(executor);
+    this.executor = executor;
+    this.overflowQueue = new LinkedBlockingQueue<Runnable>();
+    this.executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+      /**
+       * Once the ThreadPoolExecutor is at full capacity, it starts to reject
+       * submissions which are queued for later submission.
+       */
+      @Override
+      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+        overflowQueue.add(r);
+      }
+    });
+  }
+
+  @Override
+  public Future<V> take() throws InterruptedException {
+    Future<V> take = super.take();
+    if (!executor.isTerminating() && !overflowQueue.isEmpty() && executor.getActiveCount()
< executor.getMaximumPoolSize()) {
+      Runnable overflow = overflowQueue.poll();
+      if (overflow != null) {
+        executor.execute(overflow);
+      }
+    }
+    return take;
+  }
+
+  @Override
+  public Future<V> poll() {
+    Future<V> poll = super.poll();
+    if (!executor.isTerminating() && !overflowQueue.isEmpty() && executor.getActiveCount()
< executor.getMaximumPoolSize()) {
+      Runnable overflow = overflowQueue.poll();
+      if (overflow != null) {
+        executor.execute(overflow);
+      }
+    }
+    return poll;
+  }
+
+  @Override
+  public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
+    Future<V> poll = super.poll(timeout, unit);
+    if (!executor.isTerminating() && !overflowQueue.isEmpty() && executor.getActiveCount()
< executor.getMaximumPoolSize()) {
+      Runnable overflow = overflowQueue.poll();
+      if (overflow != null) {
+        executor.execute(overflow);
+      }
+    }
+    return poll;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java
new file mode 100644
index 0000000..7a5e479
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/ScalingThreadPoolExecutor.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.utilities;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An {@link ExecutorService} that executes each submitted task using one of
+ * possibly several pooled threads. It also scales up the number of threads in
+ * the pool if the number of submissions exceeds the core size of the pool. The
+ * pool can scale up to the specified maximum pool size.
+ * 
+ * If the number of submissions exceeds the sum of the core and maximum size of
+ * the thread pool, the submissions are then handled by the provided
+ * {@link RejectedExecutionHandler}.
+ * 
+ * If the overflowing submissions need to be handled,
+ * {@link BufferedThreadPoolExecutorCompletionService} can be used to buffer up
+ * overflowing submissions for later submission as threads become available.
+ * 
+ * @see BufferedThreadPoolExecutorCompletionService
+ */
+public class ScalingThreadPoolExecutor extends ThreadPoolExecutor {
+
+  public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(corePoolSize));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
index 7af0167..67c0979 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
@@ -533,4 +533,19 @@ public class ConfigurationTest {
     Assert.assertEquals(1000, configuration.getAlertCacheSize());
   }
 
+  @Test
+  public void testPropertyProviderThreadPoolSizes() throws Exception {
+    final Properties ambariProperties = new Properties();
+    final Configuration configuration = new Configuration(ambariProperties);
+
+    Assert.assertEquals(2 * Runtime.getRuntime().availableProcessors(), configuration.getPropertyProvidersThreadPoolCoreSize());
+    Assert.assertEquals(4 * Runtime.getRuntime().availableProcessors(), configuration.getPropertyProvidersThreadPoolMaxSize());
+
+    ambariProperties.setProperty(Configuration.PROPERTY_PROVIDER_THREADPOOL_MAX_SIZE_KEY,
"44");
+    ambariProperties.setProperty(Configuration.PROPERTY_PROVIDER_THREADPOOL_CORE_SIZE_KEY,
"22");
+
+    Assert.assertEquals(22, configuration.getPropertyProvidersThreadPoolCoreSize());
+    Assert.assertEquals(44, configuration.getPropertyProvidersThreadPoolMaxSize());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
index 0ae3e6a..9b27447 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
@@ -23,11 +23,13 @@ import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.persist.PersistService;
 import com.google.inject.util.Modules;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.jmx.JMXPropertyProvider;
 import org.apache.ambari.server.controller.jmx.TestStreamProvider;
 import org.apache.ambari.server.controller.metrics.JMXPropertyProviderTest;
 import org.apache.ambari.server.controller.metrics.MetricsServiceProvider;
@@ -97,6 +99,8 @@ public class StackDefinedPropertyProviderTest {
   public static void setupCache() {
     cacheEntryFactory = new TimelineMetricCacheEntryFactory(new Configuration());
     cacheProvider = new TimelineMetricCacheProvider(new Configuration(), cacheEntryFactory);
+    Injector injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    JMXPropertyProvider.init(injector.getInstance(Configuration.class));
   }
 
   public class TestModuleWithCacheProvider implements Module {

http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
index f76c322..2fef7be 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
@@ -19,6 +19,7 @@
 package org.apache.ambari.server.controller.metrics;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariServer;
 import org.apache.ambari.server.controller.internal.ResourceImpl;
@@ -30,6 +31,7 @@ import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.spi.TemporalInfo;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.security.TestAuthenticationFactory;
 import org.apache.ambari.server.security.authorization.AuthorizationException;
 import org.apache.ambari.server.state.Cluster;
@@ -37,9 +39,13 @@ import org.apache.ambari.server.state.Clusters;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.springframework.security.core.context.SecurityContextHolder;
 
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
 import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,6 +70,12 @@ public class JMXPropertyProviderTest {
 
   public static final int NUMBER_OF_RESOURCES = 400;
 
+  @BeforeClass
+  public static void setupClass() {
+    Injector injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    JMXPropertyProvider.init(injector.getInstance(Configuration.class));
+  }
+
   @Before
   public void setUpCommonMocks() throws AmbariException, NoSuchFieldException, IllegalAccessException
{
     AmbariManagementController amc = createNiceMock(AmbariManagementController.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java
index 220f905..de3bc82 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java
@@ -20,12 +20,15 @@ package org.apache.ambari.server.controller.metrics;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariServer;
 import org.apache.ambari.server.controller.internal.PropertyInfo;
 import org.apache.ambari.server.controller.internal.ResourceImpl;
 import org.apache.ambari.server.controller.internal.StackDefinedPropertyProvider;
+import org.apache.ambari.server.controller.jmx.JMXPropertyProvider;
 import org.apache.ambari.server.controller.jmx.TestStreamProvider;
 import org.apache.ambari.server.controller.metrics.MetricsServiceProvider.MetricsService;
 import org.apache.ambari.server.controller.spi.Request;
@@ -102,6 +105,7 @@ public class RestMetricsPropertyProviderTest {
     clusters = injector.getInstance(Clusters.class);
     clusters.addCluster("c1", new StackId("HDP-2.1.1"));
     c1 = clusters.getCluster("c1");
+    JMXPropertyProvider.init(injector.getInstance(Configuration.class));
 
     // Setting up Mocks for Controller, Clusters etc, queried as part of user's Role context
     // while fetching Metrics.

http://git-wip-us.apache.org/repos/asf/ambari/blob/5acbc4d8/ambari-server/src/test/java/org/apache/ambari/server/controller/test/BufferedThreadPoolExecutorCompletionServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/test/BufferedThreadPoolExecutorCompletionServiceTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/controller/test/BufferedThreadPoolExecutorCompletionServiceTest.java
new file mode 100644
index 0000000..f47068c
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/test/BufferedThreadPoolExecutorCompletionServiceTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.test;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.ambari.server.controller.utilities.BufferedThreadPoolExecutorCompletionService;
+import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor;
+import org.junit.Test;
+
+public class BufferedThreadPoolExecutorCompletionServiceTest {
+
+  private void longOp() throws InterruptedException {
+    Thread.sleep(3000);
+    System.out.println("Completed " + Thread.currentThread());
+  }
+
+  /**
+   * Tests that when unbounded queue provided to executor, only
+   * {@link ThreadPoolExecutor#getCorePoolSize()} threads are launched
+   * 
+   * @throws InterruptedException
+   */
+  @Test
+  public void testOnlyCorePoolThreadsLaunchedForUnboundedQueue() throws InterruptedException
{
+    int CORE_POOL_SIZE = 2;
+    int MAX_POOL_SIZE = 5;
+    int TASKS_COUNT = 8;
+    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,
30000, TimeUnit.MILLISECONDS, queue);
+    BufferedThreadPoolExecutorCompletionService<Runnable> service = new BufferedThreadPoolExecutorCompletionService<>(threadPoolExecutor);
+    for (int tc = 0; tc < TASKS_COUNT; tc++) {
+      service.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            longOp();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }, null);
+    }
+
+    // While waiting for tasks completion, check how many threads are being used
+    Thread.sleep(500);
+    Assert.assertEquals(CORE_POOL_SIZE, threadPoolExecutor.getActiveCount());
+    for (int tc = 0; tc < TASKS_COUNT; tc++) {
+      Future<Runnable> take = service.take();
+      Assert.assertTrue(take.isDone());
+      Assert.assertTrue("No more than CORE_POOL_SIZE threads should be launched", threadPoolExecutor.getActiveCount()
<= CORE_POOL_SIZE);
+    }
+    threadPoolExecutor.shutdown();
+  }
+
+  /**
+   * Tests that when load is more than core-pool-size and less than
+   * max-pool-size, the number of threads scales up.
+   * 
+   * @throws InterruptedException
+   */
+  @Test
+  public void testLessThanMaxPoolSizeThreadsLaunched() throws InterruptedException {
+    int CORE_POOL_SIZE = 2;
+    int MAX_POOL_SIZE = 10;
+    int TASKS_COUNT = 8;
+    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(CORE_POOL_SIZE);
+    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,
30000, TimeUnit.MILLISECONDS, queue);
+    BufferedThreadPoolExecutorCompletionService<Runnable> service = new BufferedThreadPoolExecutorCompletionService<>(threadPoolExecutor);
+    for (int tc = 0; tc < TASKS_COUNT; tc++) {
+      service.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            longOp();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }, null);
+    }
+
+    // While waiting for tasks completion, check how many threads are being used
+    Thread.sleep(500);
+    Assert.assertEquals(TASKS_COUNT - CORE_POOL_SIZE, threadPoolExecutor.getActiveCount());
+    for (int tc = 0; tc < TASKS_COUNT; tc++) {
+      Future<Runnable> take = service.take();
+      Assert.assertTrue(take.isDone());
+      Assert.assertTrue("No more than TASKS_COUNT threads should be launched", threadPoolExecutor.getActiveCount()
<= TASKS_COUNT);
+    }
+    threadPoolExecutor.shutdown();
+  }
+
+  /**
+   * Tests that when load is more than max-pool-size, the number of threads
+   * scales up.
+   * 
+   * @throws InterruptedException
+   */
+  @Test
+  public void testMaxPoolSizeThreadsLaunched() throws InterruptedException {
+    int CORE_POOL_SIZE = 2;
+    int MAX_POOL_SIZE = 10;
+    int TASKS_COUNT = 24;
+    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(CORE_POOL_SIZE);
+    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,
30000, TimeUnit.MILLISECONDS, queue);
+    BufferedThreadPoolExecutorCompletionService<Runnable> service = new BufferedThreadPoolExecutorCompletionService<>(threadPoolExecutor);
+    for (int tc = 0; tc < TASKS_COUNT; tc++) {
+      service.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            longOp();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }, null);
+    }
+
+    // While waiting for tasks completion, check how many threads are being used
+    Thread.sleep(500);
+    Assert.assertEquals(MAX_POOL_SIZE, threadPoolExecutor.getActiveCount());
+    for (int tc = 0; tc < TASKS_COUNT; tc++) {
+      Future<Runnable> take = service.take();
+      Assert.assertTrue(take.isDone());
+      Assert.assertTrue("No more than MAX_POOL_SIZE threads should be launched", threadPoolExecutor.getActiveCount()
<= MAX_POOL_SIZE);
+    }
+    threadPoolExecutor.shutdown();
+  }
+
+  /**
+   * Tests that when load is more than max-pool-size, the number of threads
+   * scales up.
+   * 
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScalingThreadPoolExecutor() throws InterruptedException {
+    int CORE_POOL_SIZE = 2;
+    int MAX_POOL_SIZE = 10;
+    int TASKS_COUNT = 24;
+    ThreadPoolExecutor threadPoolExecutor = new ScalingThreadPoolExecutor(CORE_POOL_SIZE,
MAX_POOL_SIZE, 30000, TimeUnit.MILLISECONDS);
+    BufferedThreadPoolExecutorCompletionService<Runnable> service = new BufferedThreadPoolExecutorCompletionService<>(threadPoolExecutor);
+    for (int tc = 0; tc < TASKS_COUNT; tc++) {
+      service.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            longOp();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }, null);
+    }
+
+    // While waiting for tasks completion, check how many threads are being used
+    Thread.sleep(500);
+    Assert.assertEquals(MAX_POOL_SIZE, threadPoolExecutor.getActiveCount());
+    for (int tc = 0; tc < TASKS_COUNT; tc++) {
+      Future<Runnable> take = service.take();
+      Assert.assertTrue(take.isDone());
+      Assert.assertTrue("No more than MAX_POOL_SIZE threads should be launched", threadPoolExecutor.getActiveCount()
<= MAX_POOL_SIZE);
+    }
+    threadPoolExecutor.shutdown();
+  }
+}


Mime
View raw message