activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-312 Allow configurable of, and inject of client global thread pools
Date Wed, 16 Dec 2015 23:21:13 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master f4235a6b9 -> f8741db4a


ARTEMIS-312 Allow configurable of, and inject of client global thread pools


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0e8f2f39
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0e8f2f39
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0e8f2f39

Branch: refs/heads/master
Commit: 0e8f2f39af666f50435b784c41c3d3767cfc4f7e
Parents: f4235a6
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Fri Dec 11 17:53:25 2015 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Dec 16 18:19:25 2015 -0500

----------------------------------------------------------------------
 .../artemis/api/core/client/ActiveMQClient.java | 178 ++++++++++++-
 .../artemis/api/core/client/ServerLocator.java  |   8 +-
 .../core/client/impl/ServerLocatorImpl.java     |  92 ++-----
 .../activemq/artemis/ClientThreadPoolsTest.java | 261 +++++++++++++++++++
 .../artemis/tests/util/ActiveMQTestBase.java    |   6 +-
 .../integration/client/CoreClientTest.java      |  60 ++++-
 6 files changed, 515 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
index 1bb8d38..11918e1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
@@ -16,14 +16,26 @@
  */
 package org.apache.activemq.artemis.api.core.client;
 
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import java.net.URI;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
+import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.artemis.uri.ServerLocatorParser;
-
-import java.net.URI;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 
 /**
  * Utility class for creating ActiveMQ Artemis {@link ClientSessionFactory} objects.
@@ -34,6 +46,10 @@ import java.net.URI;
  */
 public final class ActiveMQClient {
 
+   public static int globalThreadMaxPoolSize;
+
+   public static int globalScheduledThreadPoolSize;
+
    public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();
 
    public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = ActiveMQDefaultConfiguration.getDefaultClientFailureCheckPeriod();
@@ -102,6 +118,8 @@ public final class ActiveMQClient {
 
    public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1;
 
+   public static final int DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE = 500;
+
    public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
 
    public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;
@@ -114,6 +132,160 @@ public final class ActiveMQClient {
 
    public static final String DEFAULT_CORE_PROTOCOL = "CORE";
 
+   public static final String THREAD_POOL_MAX_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.thread.pool.max.size";
+
+   public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";
+
+   private static ThreadPoolExecutor globalThreadPool;
+
+   private static boolean injectedPools = false;
+
+   private static ScheduledThreadPoolExecutor globalScheduledThreadPool;
+
+
+   static {
+      initializeGlobalThreadPoolProperties();
+   }
+
+   public static synchronized void clearThreadPools() {
+      clearThreadPools(10, TimeUnit.SECONDS);
+   }
+
+
+   public static synchronized void clearThreadPools(long time, TimeUnit unit) {
+
+      if (injectedPools) {
+         globalThreadPool = null;
+         globalScheduledThreadPool = null;
+         injectedPools = false;
+         return;
+      }
+
+      if (globalThreadPool != null) {
+         globalThreadPool.shutdown();
+         try {
+            if (!globalThreadPool.awaitTermination(time, unit)) {
+               globalThreadPool.shutdownNow();
+               ActiveMQClientLogger.LOGGER.warn("Couldn't finish the client globalThreadPool
in less than 10 seconds, interrupting it now");
+            }
+         }
+         catch (InterruptedException e) {
+            throw new ActiveMQInterruptedException(e);
+         }
+         finally {
+            globalThreadPool = null;
+         }
+      }
+
+      if (globalScheduledThreadPool != null) {
+         globalScheduledThreadPool.shutdown();
+         try {
+            if (!globalScheduledThreadPool.awaitTermination(time, unit)) {
+               globalScheduledThreadPool.shutdownNow();
+               ActiveMQClientLogger.LOGGER.warn("Couldn't finish the client scheduled in
less than 10 seconds, interrupting it now");
+            }
+         }
+         catch (InterruptedException e) {
+            throw new ActiveMQInterruptedException(e);
+         }
+         finally {
+            globalScheduledThreadPool = null;
+         }
+      }
+   }
+
+   /** Warning: This method has to be called before any clients or servers is started on
the JVM otherwise previous ServerLocator would be broken after this call. */
+   public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, ScheduledThreadPoolExecutor
scheduledThreadPoolExecutor) {
+
+      // We call clearThreadPools as that will shutdown any previously used executor
+      clearThreadPools();
+
+      ActiveMQClient.globalThreadPool = globalThreadPool;
+      ActiveMQClient.globalScheduledThreadPool = scheduledThreadPoolExecutor;
+      injectedPools = true;
+   }
+
+   public static synchronized ThreadPoolExecutor getGlobalThreadPool() {
+      if (globalThreadPool == null) {
+         ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>()
{
+            @Override
+            public ThreadFactory run() {
+               return new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
+            }
+         });
+
+         if (globalThreadMaxPoolSize == -1) {
+            globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), factory);
+         }
+         else {
+            globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadMaxPoolSize,
ActiveMQClient.globalThreadMaxPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
factory);
+         }
+      }
+      return globalThreadPool;
+   }
+
+   public static synchronized ScheduledThreadPoolExecutor getGlobalScheduledThreadPool()
{
+      if (globalScheduledThreadPool == null) {
+         ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>()
{
+            @Override
+            public ThreadFactory run() {
+               return new ActiveMQThreadFactory("ActiveMQ-client-global-scheduled-threads",
true, ClientSessionFactoryImpl.class.getClassLoader());
+            }
+         });
+
+         globalScheduledThreadPool =  new ScheduledThreadPoolExecutor(ActiveMQClient.globalScheduledThreadPoolSize,
factory);
+      }
+      return globalScheduledThreadPool;
+   }
+
+
+
+
+   /**
+    * (Re)Initializes the global thread pools properties from System properties.  This method
will update the global
+    * thread pool configuration based on defined System properties (or defaults if they are
not set) notifying
+    * all globalThreadPoolListeners.  The System properties key names are as follow:
+    *
+    * ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY="activemq.artemis.client.global.thread.pool.max.size"
+    * ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY="activemq.artemis.client.global.scheduled.thread.pool.core.size
+    *
+    * The min value for max thread pool size is 2.  Providing a value lower than 2 will be
ignored and will defaul to 2.
+    *
+    * Note.  The ServerLocatorImpl registers a listener and uses it to configure it's global
thread pools.  If global
+    * thread pools have already been created, they will be updated with these new values.
+    */
+   public static void initializeGlobalThreadPoolProperties() {
+
+      setGlobalThreadPoolProperties(Integer.valueOf(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY,
"" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE))), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY,
"" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE)));
+   }
+
+   /**
+    * Allows programatically configuration of global thread pools properties.  This method
will update the global
+    * thread pool configuration based on the provided values notifying all globalThreadPoolListeners.
+    *
+    * Note.  The ServerLocatorImpl registers a listener and uses it to configure it's global
thread pools.  If global
+    * thread pools have already been created, they will be updated with these new values.
+    *
+    * The min value for max thread pool size is 2.  Providing a value lower than 2 will be
ignored and will default to 2.
+    */
+   public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, int globalScheduledThreadPoolSize)
{
+
+      if (globalThreadMaxPoolSize < 2) globalThreadMaxPoolSize = 2;
+
+      ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize;
+      ActiveMQClient.globalThreadMaxPoolSize = globalThreadMaxPoolSize;
+
+      // if injected, we won't do anything with the pool as they're not ours
+      if (!injectedPools) {
+         // Right now I'm ignoring the corePool size on purpose as there's no way to have
two values for the number of threads
+         // this is basically a fixed size thread pool (although the pool grows on demand)
+         getGlobalThreadPool().setCorePoolSize(globalThreadMaxPoolSize);
+         getGlobalThreadPool().setMaximumPoolSize(globalThreadMaxPoolSize);
+
+         getGlobalScheduledThreadPool().setCorePoolSize(globalScheduledThreadPoolSize);
+      }
+   }
+
    /**
     * Creates an ActiveMQConnectionFactory;
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
index af65bf5..db4b6fd 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
@@ -16,9 +16,12 @@
  */
 package org.apache.activemq.artemis.api.core.client;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.core.client.impl.Topology;
 import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
 
@@ -784,6 +787,5 @@ public interface ServerLocator extends AutoCloseable {
 
    String getOutgoingInterceptorList();
 
-
-
+   boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPoolExecutor);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index a483f5d..77e1e66 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -187,12 +187,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
 
    private final List<Interceptor> outgoingInterceptors = new CopyOnWriteArrayList<>();
 
-   private static ExecutorService globalThreadPool;
-
    private Executor startExecutor;
 
-   private static ScheduledExecutorService globalScheduledThreadPool;
-
    private AfterConnectInternalListener afterConnectListener;
 
    private String groupID;
@@ -208,68 +204,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
    public static Runnable finalizeCallback = null;
 
    public static synchronized void clearThreadPools() {
-
-      if (globalThreadPool != null) {
-         globalThreadPool.shutdown();
-         try {
-            if (!globalThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
-               throw new IllegalStateException("Couldn't finish the globalThreadPool");
-            }
-         }
-         catch (InterruptedException e) {
-            throw new ActiveMQInterruptedException(e);
-         }
-         finally {
-            globalThreadPool = null;
-         }
-      }
-
-      if (globalScheduledThreadPool != null) {
-         globalScheduledThreadPool.shutdown();
-         try {
-            if (!globalScheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
-               throw new IllegalStateException("Couldn't finish the globalScheduledThreadPool");
-            }
-         }
-         catch (InterruptedException e) {
-            throw new ActiveMQInterruptedException(e);
-         }
-         finally {
-            globalScheduledThreadPool = null;
-         }
-      }
-   }
-
-   private static synchronized ExecutorService getGlobalThreadPool() {
-      if (globalThreadPool == null) {
-         ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>()
{
-            @Override
-            public ThreadFactory run() {
-               return new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
-            }
-         });
-
-         globalThreadPool = Executors.newCachedThreadPool(factory);
-      }
-
-      return globalThreadPool;
-   }
-
-   private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() {
-      if (globalScheduledThreadPool == null) {
-         ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>()
{
-            @Override
-            public ThreadFactory run() {
-               return new ActiveMQThreadFactory("ActiveMQ-client-global-scheduled-threads",
true, ClientSessionFactoryImpl.class.getClassLoader());
-            }
-         });
-
-         globalScheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
-
-               factory);
-      }
-
-      return globalScheduledThreadPool;
+      ActiveMQClient.clearThreadPools();
    }
 
    private synchronized void setThreadPools() {
@@ -277,9 +212,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
          return;
       }
       else if (useGlobalPools) {
-         threadPool = getGlobalThreadPool();
+         threadPool = ActiveMQClient.getGlobalThreadPool();
 
-         scheduledThreadPool = getGlobalScheduledThreadPool();
+         scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool();
       }
       else {
          this.shutdownPool = true;
@@ -309,6 +244,23 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
       }
    }
 
+   @Override
+   public synchronized boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService
scheduledThreadPool) {
+
+      if (threadPool == null || scheduledThreadPool == null) return false;
+
+      if (this.threadPool == null && this.scheduledThreadPool == null) {
+         useGlobalPools = false;
+         shutdownPool = false;
+         this.threadPool = threadPool;
+         this.scheduledThreadPool = scheduledThreadPool;
+         return true;
+      }
+      else {
+         return false;
+      }
+   }
+
    private void instantiateLoadBalancingPolicy() {
       if (connectionLoadBalancingPolicyClassName == null) {
          throw new IllegalStateException("Please specify a load balancing policy class name
on the session factory");
@@ -409,10 +361,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
 
       useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS;
 
-      scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-
       threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
 
+      scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+
       retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
 
       retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java
new file mode 100644
index 0000000..57399cd
--- /dev/null
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ClientThreadPoolsTest {
+
+   private static Properties systemProperties;
+
+   @BeforeClass
+   public static void setup() {
+      systemProperties = System.getProperties();
+   }
+
+   @AfterClass
+   public static void tearDown() {
+      System.clearProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY);
+      System.clearProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY);
+      ActiveMQClient.initializeGlobalThreadPoolProperties();
+      ActiveMQClient.clearThreadPools();
+      Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.globalThreadMaxPoolSize);
+   }
+
+   @Test
+   public void testSystemPropertyThreadPoolSettings() throws Exception {
+      int threadPoolMaxSize = 100;
+      int scheduledThreadPoolSize = 10;
+
+      System.setProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + threadPoolMaxSize);
+      System.setProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + scheduledThreadPoolSize);
+      ActiveMQClient.initializeGlobalThreadPoolProperties();
+
+      testSystemPropertiesThreadPoolSettings(threadPoolMaxSize, scheduledThreadPoolSize);
+   }
+
+   @Test
+   public void testShutdownPoolInUse() throws Exception {
+      ActiveMQClient.clearThreadPools();
+      ActiveMQClient.setGlobalThreadPoolProperties(10, 1);
+
+      final CountDownLatch inUse = new CountDownLatch(1);
+      final CountDownLatch neverLeave = new CountDownLatch(1);
+
+      ActiveMQClient.getGlobalThreadPool().execute(new Runnable() {
+         @Override
+         public void run() {
+            System.err.println("Hello!");
+            try {
+               inUse.countDown();
+               neverLeave.await();
+            }
+            catch (Exception e) {
+               e.printStackTrace();
+               neverLeave.countDown();
+            }
+         }
+      });
+
+      Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
+      ActiveMQClient.clearThreadPools(100, TimeUnit.MILLISECONDS);
+      Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS));
+   }
+
+   @Test
+   public void testInjectPools() throws Exception {
+      ActiveMQClient.clearThreadPools();
+
+      ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1,
+                                                               0L, TimeUnit.MILLISECONDS,
+                                                               new LinkedBlockingQueue<Runnable>());
+
+      ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1);
+
+      ActiveMQClient.injectPools(poolExecutor, scheduledThreadPoolExecutor);
+
+      final CountDownLatch inUse = new CountDownLatch(1);
+      final CountDownLatch neverLeave = new CountDownLatch(1);
+
+      ActiveMQClient.getGlobalThreadPool().execute(new Runnable() {
+         @Override
+         public void run() {
+            System.err.println("Hello!");
+            try {
+               inUse.countDown();
+               neverLeave.await();
+            }
+            catch (Exception e) {
+               e.printStackTrace();
+               neverLeave.countDown();
+            }
+         }
+      });
+
+
+      Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
+      poolExecutor.shutdownNow();
+      scheduledThreadPoolExecutor.shutdownNow();
+      Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS));
+
+      Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
+      Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS));
+
+      ActiveMQClient.clearThreadPools(100, TimeUnit.MILLISECONDS);
+   }
+
+   @Test
+   public void testStaticPropertiesThreadPoolSettings() throws Exception {
+
+      int testMaxSize = 999;
+      int testScheduleSize = 9;
+
+      ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
+      testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
+   }
+
+   @Test
+   public void testSmallPool() throws Exception {
+
+      int testMaxSize = 2;
+      int testScheduleSize = 9;
+
+      ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
+      testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
+   }
+
+   private void testSystemPropertiesThreadPoolSettings(int expectedMax, int expectedScheduled)
throws Exception {
+      ServerLocatorImpl serverLocator = new ServerLocatorImpl(false);
+      serverLocator.isUseGlobalPools();
+
+      Method setThreadPools = ServerLocatorImpl.class.getDeclaredMethod("setThreadPools");
+      setThreadPools.setAccessible(true);
+      setThreadPools.invoke(serverLocator);
+
+      // TODO: I would get this from the ActiveMQClient
+      Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
+      Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
+
+      threadPoolField.setAccessible(true);
+      scheduledThreadPoolField.setAccessible(true);
+
+      ThreadPoolExecutor threadPool = ActiveMQClient.getGlobalThreadPool();
+
+      final CountDownLatch doneMax = new CountDownLatch(expectedMax);
+      final CountDownLatch latch = new CountDownLatch(1);
+      final CountDownLatch latchTotal = new CountDownLatch(expectedMax * 3); // we will schedule
3 * max, so all runnables should execute
+      final AtomicInteger errors = new AtomicInteger(0);
+
+
+      // Set this to true if you need to debug why executions are not being performed.
+      final boolean debugExecutions = false;
+
+      for (int i = 0; i < expectedMax * 3; i++) {
+         final int localI = i;
+         threadPool.execute(new Runnable() {
+            @Override
+            public void run() {
+               try {
+
+                  if (debugExecutions) {
+                     System.out.println("runnable " + localI);
+                  }
+                  doneMax.countDown();
+                  latch.await();
+                  latchTotal.countDown();
+               }
+               catch (Exception e) {
+                  errors.incrementAndGet();
+               }
+               finally {
+                  if (debugExecutions) {
+                     System.out.println("done " + localI);
+                  }
+               }
+            }
+         });
+      }
+
+      Assert.assertTrue(doneMax.await(5, TimeUnit.SECONDS));
+      latch.countDown();
+      Assert.assertTrue(latchTotal.await(5, TimeUnit.SECONDS));
+
+
+      ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
+
+      // TODO: We need to figure out what to do with getCorePoolSize
+      assertEquals(expectedMax, threadPool.getCorePoolSize());
+      assertEquals(expectedMax, threadPool.getMaximumPoolSize());
+      assertEquals(expectedScheduled, scheduledThreadPool.getCorePoolSize());
+   }
+
+   @Test
+   public void testThreadPoolInjection() throws Exception {
+
+      ServerLocator serverLocator = new ServerLocatorImpl(false);
+
+      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
+      ScheduledThreadPoolExecutor scheduledThreadPool =  new ScheduledThreadPoolExecutor(1);
+      serverLocator.setThreadPools(threadPool, scheduledThreadPool);
+
+      Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
+      Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
+
+      Method initialise = ServerLocatorImpl.class.getDeclaredMethod("initialise");
+      initialise.setAccessible(true);
+      initialise.invoke(serverLocator);
+
+      threadPoolField.setAccessible(true);
+      scheduledThreadPoolField.setAccessible(true);
+
+      ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadPoolField.get(serverLocator);
+      ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
+
+      assertEquals(threadPool, tpe);
+      assertEquals(scheduledThreadPool, stpe);
+   }
+
+   @After
+   public void cleanup() {
+      // Resets the global thread pool properties back to default.
+      System.setProperties(systemProperties);
+      ActiveMQClient.initializeGlobalThreadPoolProperties();
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index a75b73c..41916fe 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -2403,8 +2403,10 @@ public abstract class ActiveMQTestBase extends Assert {
    protected void closeAllSessionFactories() {
       synchronized (sessionFactories) {
          for (ClientSessionFactory sf : sessionFactories) {
-            closeSessionFactory(sf);
-            assert sf.isClosed();
+            if (!sf.isClosed()) {
+               closeSessionFactory(sf);
+               assert sf.isClosed();
+            }
          }
          sessionFactories.clear();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
index 18ef280..a4ef71a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
@@ -16,8 +16,14 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -36,33 +42,61 @@ public class CoreClientTest extends ActiveMQTestBase {
 
    private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
 
-   // Constants -----------------------------------------------------
+   @Test
+   public void testCoreClientNetty() throws Exception {
+      testCoreClient(true, null);
+   }
 
-   // Attributes ----------------------------------------------------
+   @Test
+   public void testCoreClientInVM() throws Exception {
+      testCoreClient(false, null);
+   }
 
-   // Static --------------------------------------------------------
+   @Test
+   public void testCoreClientWithInjectedThreadPools() throws Exception {
 
-   // Constructors --------------------------------------------------
+      ExecutorService threadPool = Executors.newCachedThreadPool();
+      ScheduledThreadPoolExecutor scheduledThreadPool =  new ScheduledThreadPoolExecutor(10);
 
-   // Public --------------------------------------------------------
+      ServerLocator locator =  createNonHALocator(false);
+      boolean setThreadPools = locator.setThreadPools(threadPool, scheduledThreadPool);
 
-   @Test
-   public void testCoreClientNetty() throws Exception {
-      testCoreClient(true);
+      assertTrue(setThreadPools);
+      testCoreClient(true, locator);
+
+      threadPool.shutdown();
+      scheduledThreadPool.shutdown();
+
+      threadPool.awaitTermination(60, TimeUnit.SECONDS);
+      scheduledThreadPool.awaitTermination(60, TimeUnit.SECONDS);
    }
 
    @Test
-   public void testCoreClientInVM() throws Exception {
-      testCoreClient(false);
+   public void testCoreClientWithGlobalThreadPoolParamtersChanged() throws Exception {
+
+      int originalScheduled = ActiveMQClient.globalScheduledThreadPoolSize;
+      int originalGlobal = ActiveMQClient.globalThreadMaxPoolSize;
+
+      try {
+         ActiveMQClient.setGlobalThreadPoolProperties(2, 1);
+         ServerLocator locator = createNonHALocator(false);
+         testCoreClient(true, locator);
+      }
+      finally {
+         // restoring original value otherwise future tests would be screwed up
+         ActiveMQClient.setGlobalThreadPoolProperties(originalGlobal, originalScheduled);
+         ActiveMQClient.clearThreadPools();
+      }
    }
 
-   private void testCoreClient(final boolean netty) throws Exception {
+   private void testCoreClient(final boolean netty, ServerLocator serverLocator) throws Exception
{
       final SimpleString QUEUE = new SimpleString("CoreClientTestQueue");
 
       ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(netty),
false));
 
       server.start();
-      ServerLocator locator = createNonHALocator(netty);
+
+      ServerLocator locator = serverLocator == null ? createNonHALocator(netty) : serverLocator;
 
       ClientSessionFactory sf = createSessionFactory(locator);
 
@@ -105,5 +139,7 @@ public class CoreClientTest extends ActiveMQTestBase {
 
          message2.acknowledge();
       }
+
+      sf.close();
    }
 }


Mime
View raw message