Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 19A7B183AF for ; Wed, 16 Dec 2015 23:21:14 +0000 (UTC) Received: (qmail 8263 invoked by uid 500); 16 Dec 2015 23:21:14 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 8221 invoked by uid 500); 16 Dec 2015 23:21:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 8212 invoked by uid 99); 16 Dec 2015 23:21:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Dec 2015 23:21:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C5830DFFB9; Wed, 16 Dec 2015 23:21:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 16 Dec 2015 23:21:13 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq-artemis git commit: ARTEMIS-312 Allow configurable of, and inject of client global thread pools Repository: activemq-artemis Updated Branches: refs/heads/master f4235a6b9 -> f8741db4a ARTEMIS-312 Allow configurable of, and inject of client global thread pools Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0e8f2f39 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0e8f2f39 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0e8f2f39 Branch: refs/heads/master Commit: 0e8f2f39af666f50435b784c41c3d3767cfc4f7e Parents: f4235a6 Author: Martyn Taylor Authored: Fri Dec 11 17:53:25 2015 +0000 Committer: Clebert Suconic Committed: Wed Dec 16 18:19:25 2015 -0500 ---------------------------------------------------------------------- .../artemis/api/core/client/ActiveMQClient.java | 178 ++++++++++++- .../artemis/api/core/client/ServerLocator.java | 8 +- .../core/client/impl/ServerLocatorImpl.java | 92 ++----- .../activemq/artemis/ClientThreadPoolsTest.java | 261 +++++++++++++++++++ .../artemis/tests/util/ActiveMQTestBase.java | 6 +- .../integration/client/CoreClientTest.java | 60 ++++- 6 files changed, 515 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java index 1bb8d38..11918e1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java @@ -16,14 +16,26 @@ */ package org.apache.activemq.artemis.api.core.client; -import org.apache.activemq.artemis.api.core.TransportConfiguration; +import java.net.URI; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy; +import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.uri.ServerLocatorParser; - -import java.net.URI; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; /** * Utility class for creating ActiveMQ Artemis {@link ClientSessionFactory} objects. @@ -34,6 +46,10 @@ import java.net.URI; */ public final class ActiveMQClient { + public static int globalThreadMaxPoolSize; + + public static int globalScheduledThreadPoolSize; + public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName(); public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = ActiveMQDefaultConfiguration.getDefaultClientFailureCheckPeriod(); @@ -102,6 +118,8 @@ public final class ActiveMQClient { public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1; + public static final int DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE = 500; + public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5; public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false; @@ -114,6 +132,160 @@ public final class ActiveMQClient { public static final String DEFAULT_CORE_PROTOCOL = "CORE"; + public static final String THREAD_POOL_MAX_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.thread.pool.max.size"; + + public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size"; + + private static ThreadPoolExecutor globalThreadPool; + + private static boolean injectedPools = false; + + private static ScheduledThreadPoolExecutor globalScheduledThreadPool; + + + static { + initializeGlobalThreadPoolProperties(); + } + + public static synchronized void clearThreadPools() { + clearThreadPools(10, TimeUnit.SECONDS); + } + + + public static synchronized void clearThreadPools(long time, TimeUnit unit) { + + if (injectedPools) { + globalThreadPool = null; + globalScheduledThreadPool = null; + injectedPools = false; + return; + } + + if (globalThreadPool != null) { + globalThreadPool.shutdown(); + try { + if (!globalThreadPool.awaitTermination(time, unit)) { + globalThreadPool.shutdownNow(); + ActiveMQClientLogger.LOGGER.warn("Couldn't finish the client globalThreadPool in less than 10 seconds, interrupting it now"); + } + } + catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + finally { + globalThreadPool = null; + } + } + + if (globalScheduledThreadPool != null) { + globalScheduledThreadPool.shutdown(); + try { + if (!globalScheduledThreadPool.awaitTermination(time, unit)) { + globalScheduledThreadPool.shutdownNow(); + ActiveMQClientLogger.LOGGER.warn("Couldn't finish the client scheduled in less than 10 seconds, interrupting it now"); + } + } + catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + finally { + globalScheduledThreadPool = null; + } + } + } + + /** Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call. */ + public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) { + + // We call clearThreadPools as that will shutdown any previously used executor + clearThreadPools(); + + ActiveMQClient.globalThreadPool = globalThreadPool; + ActiveMQClient.globalScheduledThreadPool = scheduledThreadPoolExecutor; + injectedPools = true; + } + + public static synchronized ThreadPoolExecutor getGlobalThreadPool() { + if (globalThreadPool == null) { + ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ThreadFactory run() { + return new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); + } + }); + + if (globalThreadMaxPoolSize == -1) { + globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), factory); + } + else { + globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadMaxPoolSize, ActiveMQClient.globalThreadMaxPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue(), factory); + } + } + return globalThreadPool; + } + + public static synchronized ScheduledThreadPoolExecutor getGlobalScheduledThreadPool() { + if (globalScheduledThreadPool == null) { + ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ThreadFactory run() { + return new ActiveMQThreadFactory("ActiveMQ-client-global-scheduled-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); + } + }); + + globalScheduledThreadPool = new ScheduledThreadPoolExecutor(ActiveMQClient.globalScheduledThreadPoolSize, factory); + } + return globalScheduledThreadPool; + } + + + + + /** + * (Re)Initializes the global thread pools properties from System properties. This method will update the global + * thread pool configuration based on defined System properties (or defaults if they are not set) notifying + * all globalThreadPoolListeners. The System properties key names are as follow: + * + * ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY="activemq.artemis.client.global.thread.pool.max.size" + * ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY="activemq.artemis.client.global.scheduled.thread.pool.core.size + * + * The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will defaul to 2. + * + * Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global + * thread pools have already been created, they will be updated with these new values. + */ + public static void initializeGlobalThreadPoolProperties() { + + setGlobalThreadPoolProperties(Integer.valueOf(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE))), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE))); + } + + /** + * Allows programatically configuration of global thread pools properties. This method will update the global + * thread pool configuration based on the provided values notifying all globalThreadPoolListeners. + * + * Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global + * thread pools have already been created, they will be updated with these new values. + * + * The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will default to 2. + */ + public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, int globalScheduledThreadPoolSize) { + + if (globalThreadMaxPoolSize < 2) globalThreadMaxPoolSize = 2; + + ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize; + ActiveMQClient.globalThreadMaxPoolSize = globalThreadMaxPoolSize; + + // if injected, we won't do anything with the pool as they're not ours + if (!injectedPools) { + // Right now I'm ignoring the corePool size on purpose as there's no way to have two values for the number of threads + // this is basically a fixed size thread pool (although the pool grows on demand) + getGlobalThreadPool().setCorePoolSize(globalThreadMaxPoolSize); + getGlobalThreadPool().setMaximumPoolSize(globalThreadMaxPoolSize); + + getGlobalScheduledThreadPool().setCorePoolSize(globalScheduledThreadPoolSize); + } + } + /** * Creates an ActiveMQConnectionFactory; * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java index af65bf5..db4b6fd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java @@ -16,9 +16,12 @@ */ package org.apache.activemq.artemis.api.core.client; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; @@ -784,6 +787,5 @@ public interface ServerLocator extends AutoCloseable { String getOutgoingInterceptorList(); - - + boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPoolExecutor); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index a483f5d..77e1e66 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -187,12 +187,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private final List outgoingInterceptors = new CopyOnWriteArrayList<>(); - private static ExecutorService globalThreadPool; - private Executor startExecutor; - private static ScheduledExecutorService globalScheduledThreadPool; - private AfterConnectInternalListener afterConnectListener; private String groupID; @@ -208,68 +204,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public static Runnable finalizeCallback = null; public static synchronized void clearThreadPools() { - - if (globalThreadPool != null) { - globalThreadPool.shutdown(); - try { - if (!globalThreadPool.awaitTermination(10, TimeUnit.SECONDS)) { - throw new IllegalStateException("Couldn't finish the globalThreadPool"); - } - } - catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } - finally { - globalThreadPool = null; - } - } - - if (globalScheduledThreadPool != null) { - globalScheduledThreadPool.shutdown(); - try { - if (!globalScheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS)) { - throw new IllegalStateException("Couldn't finish the globalScheduledThreadPool"); - } - } - catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } - finally { - globalScheduledThreadPool = null; - } - } - } - - private static synchronized ExecutorService getGlobalThreadPool() { - if (globalThreadPool == null) { - ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction() { - @Override - public ThreadFactory run() { - return new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); - } - }); - - globalThreadPool = Executors.newCachedThreadPool(factory); - } - - return globalThreadPool; - } - - private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() { - if (globalScheduledThreadPool == null) { - ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction() { - @Override - public ThreadFactory run() { - return new ActiveMQThreadFactory("ActiveMQ-client-global-scheduled-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); - } - }); - - globalScheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, - - factory); - } - - return globalScheduledThreadPool; + ActiveMQClient.clearThreadPools(); } private synchronized void setThreadPools() { @@ -277,9 +212,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return; } else if (useGlobalPools) { - threadPool = getGlobalThreadPool(); + threadPool = ActiveMQClient.getGlobalThreadPool(); - scheduledThreadPool = getGlobalScheduledThreadPool(); + scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool(); } else { this.shutdownPool = true; @@ -309,6 +244,23 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + @Override + public synchronized boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPool) { + + if (threadPool == null || scheduledThreadPool == null) return false; + + if (this.threadPool == null && this.scheduledThreadPool == null) { + useGlobalPools = false; + shutdownPool = false; + this.threadPool = threadPool; + this.scheduledThreadPool = scheduledThreadPool; + return true; + } + else { + return false; + } + } + private void instantiateLoadBalancingPolicy() { if (connectionLoadBalancingPolicyClassName == null) { throw new IllegalStateException("Please specify a load balancing policy class name on the session factory"); @@ -409,10 +361,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS; - scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE; - threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE; + scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE; + retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL; retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java new file mode 100644 index 0000000..57399cd --- /dev/null +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ClientThreadPoolsTest { + + private static Properties systemProperties; + + @BeforeClass + public static void setup() { + systemProperties = System.getProperties(); + } + + @AfterClass + public static void tearDown() { + System.clearProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY); + System.clearProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY); + ActiveMQClient.initializeGlobalThreadPoolProperties(); + ActiveMQClient.clearThreadPools(); + Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.globalThreadMaxPoolSize); + } + + @Test + public void testSystemPropertyThreadPoolSettings() throws Exception { + int threadPoolMaxSize = 100; + int scheduledThreadPoolSize = 10; + + System.setProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + threadPoolMaxSize); + System.setProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + scheduledThreadPoolSize); + ActiveMQClient.initializeGlobalThreadPoolProperties(); + + testSystemPropertiesThreadPoolSettings(threadPoolMaxSize, scheduledThreadPoolSize); + } + + @Test + public void testShutdownPoolInUse() throws Exception { + ActiveMQClient.clearThreadPools(); + ActiveMQClient.setGlobalThreadPoolProperties(10, 1); + + final CountDownLatch inUse = new CountDownLatch(1); + final CountDownLatch neverLeave = new CountDownLatch(1); + + ActiveMQClient.getGlobalThreadPool().execute(new Runnable() { + @Override + public void run() { + System.err.println("Hello!"); + try { + inUse.countDown(); + neverLeave.await(); + } + catch (Exception e) { + e.printStackTrace(); + neverLeave.countDown(); + } + } + }); + + Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS)); + ActiveMQClient.clearThreadPools(100, TimeUnit.MILLISECONDS); + Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS)); + } + + @Test + public void testInjectPools() throws Exception { + ActiveMQClient.clearThreadPools(); + + ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1); + + ActiveMQClient.injectPools(poolExecutor, scheduledThreadPoolExecutor); + + final CountDownLatch inUse = new CountDownLatch(1); + final CountDownLatch neverLeave = new CountDownLatch(1); + + ActiveMQClient.getGlobalThreadPool().execute(new Runnable() { + @Override + public void run() { + System.err.println("Hello!"); + try { + inUse.countDown(); + neverLeave.await(); + } + catch (Exception e) { + e.printStackTrace(); + neverLeave.countDown(); + } + } + }); + + + Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS)); + poolExecutor.shutdownNow(); + scheduledThreadPoolExecutor.shutdownNow(); + Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS)); + + Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS)); + Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS)); + + ActiveMQClient.clearThreadPools(100, TimeUnit.MILLISECONDS); + } + + @Test + public void testStaticPropertiesThreadPoolSettings() throws Exception { + + int testMaxSize = 999; + int testScheduleSize = 9; + + ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize); + testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize); + } + + @Test + public void testSmallPool() throws Exception { + + int testMaxSize = 2; + int testScheduleSize = 9; + + ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize); + testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize); + } + + private void testSystemPropertiesThreadPoolSettings(int expectedMax, int expectedScheduled) throws Exception { + ServerLocatorImpl serverLocator = new ServerLocatorImpl(false); + serverLocator.isUseGlobalPools(); + + Method setThreadPools = ServerLocatorImpl.class.getDeclaredMethod("setThreadPools"); + setThreadPools.setAccessible(true); + setThreadPools.invoke(serverLocator); + + // TODO: I would get this from the ActiveMQClient + Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool"); + Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool"); + + threadPoolField.setAccessible(true); + scheduledThreadPoolField.setAccessible(true); + + ThreadPoolExecutor threadPool = ActiveMQClient.getGlobalThreadPool(); + + final CountDownLatch doneMax = new CountDownLatch(expectedMax); + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latchTotal = new CountDownLatch(expectedMax * 3); // we will schedule 3 * max, so all runnables should execute + final AtomicInteger errors = new AtomicInteger(0); + + + // Set this to true if you need to debug why executions are not being performed. + final boolean debugExecutions = false; + + for (int i = 0; i < expectedMax * 3; i++) { + final int localI = i; + threadPool.execute(new Runnable() { + @Override + public void run() { + try { + + if (debugExecutions) { + System.out.println("runnable " + localI); + } + doneMax.countDown(); + latch.await(); + latchTotal.countDown(); + } + catch (Exception e) { + errors.incrementAndGet(); + } + finally { + if (debugExecutions) { + System.out.println("done " + localI); + } + } + } + }); + } + + Assert.assertTrue(doneMax.await(5, TimeUnit.SECONDS)); + latch.countDown(); + Assert.assertTrue(latchTotal.await(5, TimeUnit.SECONDS)); + + + ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator); + + // TODO: We need to figure out what to do with getCorePoolSize + assertEquals(expectedMax, threadPool.getCorePoolSize()); + assertEquals(expectedMax, threadPool.getMaximumPoolSize()); + assertEquals(expectedScheduled, scheduledThreadPool.getCorePoolSize()); + } + + @Test + public void testThreadPoolInjection() throws Exception { + + ServerLocator serverLocator = new ServerLocatorImpl(false); + + ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue()); + ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(1); + serverLocator.setThreadPools(threadPool, scheduledThreadPool); + + Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool"); + Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool"); + + Method initialise = ServerLocatorImpl.class.getDeclaredMethod("initialise"); + initialise.setAccessible(true); + initialise.invoke(serverLocator); + + threadPoolField.setAccessible(true); + scheduledThreadPoolField.setAccessible(true); + + ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadPoolField.get(serverLocator); + ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator); + + assertEquals(threadPool, tpe); + assertEquals(scheduledThreadPool, stpe); + } + + @After + public void cleanup() { + // Resets the global thread pool properties back to default. + System.setProperties(systemProperties); + ActiveMQClient.initializeGlobalThreadPoolProperties(); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index a75b73c..41916fe 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -2403,8 +2403,10 @@ public abstract class ActiveMQTestBase extends Assert { protected void closeAllSessionFactories() { synchronized (sessionFactories) { for (ClientSessionFactory sf : sessionFactories) { - closeSessionFactory(sf); - assert sf.isClosed(); + if (!sf.isClosed()) { + closeSessionFactory(sf); + assert sf.isClosed(); + } } sessionFactories.clear(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e8f2f39/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java index 18ef280..a4ef71a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java @@ -16,8 +16,14 @@ */ package org.apache.activemq.artemis.tests.integration.client; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -36,33 +42,61 @@ public class CoreClientTest extends ActiveMQTestBase { private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; - // Constants ----------------------------------------------------- + @Test + public void testCoreClientNetty() throws Exception { + testCoreClient(true, null); + } - // Attributes ---------------------------------------------------- + @Test + public void testCoreClientInVM() throws Exception { + testCoreClient(false, null); + } - // Static -------------------------------------------------------- + @Test + public void testCoreClientWithInjectedThreadPools() throws Exception { - // Constructors -------------------------------------------------- + ExecutorService threadPool = Executors.newCachedThreadPool(); + ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(10); - // Public -------------------------------------------------------- + ServerLocator locator = createNonHALocator(false); + boolean setThreadPools = locator.setThreadPools(threadPool, scheduledThreadPool); - @Test - public void testCoreClientNetty() throws Exception { - testCoreClient(true); + assertTrue(setThreadPools); + testCoreClient(true, locator); + + threadPool.shutdown(); + scheduledThreadPool.shutdown(); + + threadPool.awaitTermination(60, TimeUnit.SECONDS); + scheduledThreadPool.awaitTermination(60, TimeUnit.SECONDS); } @Test - public void testCoreClientInVM() throws Exception { - testCoreClient(false); + public void testCoreClientWithGlobalThreadPoolParamtersChanged() throws Exception { + + int originalScheduled = ActiveMQClient.globalScheduledThreadPoolSize; + int originalGlobal = ActiveMQClient.globalThreadMaxPoolSize; + + try { + ActiveMQClient.setGlobalThreadPoolProperties(2, 1); + ServerLocator locator = createNonHALocator(false); + testCoreClient(true, locator); + } + finally { + // restoring original value otherwise future tests would be screwed up + ActiveMQClient.setGlobalThreadPoolProperties(originalGlobal, originalScheduled); + ActiveMQClient.clearThreadPools(); + } } - private void testCoreClient(final boolean netty) throws Exception { + private void testCoreClient(final boolean netty, ServerLocator serverLocator) throws Exception { final SimpleString QUEUE = new SimpleString("CoreClientTestQueue"); ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(netty), false)); server.start(); - ServerLocator locator = createNonHALocator(netty); + + ServerLocator locator = serverLocator == null ? createNonHALocator(netty) : serverLocator; ClientSessionFactory sf = createSessionFactory(locator); @@ -105,5 +139,7 @@ public class CoreClientTest extends ActiveMQTestBase { message2.acknowledge(); } + + sf.close(); } }