Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A13049046 for ; Thu, 6 Sep 2012 17:51:30 +0000 (UTC) Received: (qmail 66210 invoked by uid 500); 6 Sep 2012 17:51:30 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 66175 invoked by uid 500); 6 Sep 2012 17:51:30 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 66164 invoked by uid 99); 6 Sep 2012 17:51:30 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Sep 2012 17:51:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Sep 2012 17:51:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1711523888E4 for ; Thu, 6 Sep 2012 17:50:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1381695 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/ broker/region/ network/ network/jms/ store/journal/ store/kahadb/ thread/ transport/ transport/discovery/multicast/ transport/mqtt/ util/ Date: Thu, 06 Sep 2012 17:50:44 -0000 To: commits@activemq.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120906175045.1711523888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Thu Sep 6 17:50:43 2012 New Revision: 1381695 URL: http://svn.apache.org/viewvc?rev=1381695&view=rev Log: AMQ-4026: Using ThreadPoolUtils to shutdown thread pool. Use thread pool from broker service where applicable. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Sep 6 17:50:43 2012 @@ -103,6 +103,7 @@ import org.apache.activemq.util.Introspe import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.ServiceSupport; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -692,10 +693,10 @@ public class ActiveMQConnection implemen } finally { try { if (executor != null) { - executor.shutdown(); + ThreadPoolUtils.shutdown(executor); } } catch (Throwable e) { - LOG.error("Error shutting down thread pool " + e, e); + LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e); } ServiceSupport.dispose(this.transport); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Sep 6 17:50:43 2012 @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -61,6 +60,7 @@ import org.apache.activemq.transaction.S import org.apache.activemq.util.Callback; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -769,12 +769,8 @@ public class ActiveMQMessageConsumer imp } } if (executorService != null) { - executorService.shutdown(); - try { - executorService.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + ThreadPoolUtils.shutdownGraceful(executorService, 60000L); + executorService = null; } if (session.isClientAcknowledge()) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Sep 6 17:50:43 2012 @@ -111,6 +111,7 @@ import org.apache.activemq.util.IOHelper import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.JMXSupport; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -769,7 +770,7 @@ public class BrokerService implements Se this.taskRunnerFactory = null; } if (this.executor != null) { - this.executor.shutdownNow(); + ThreadPoolUtils.shutdownNow(executor); this.executor = null; } @@ -2410,8 +2411,7 @@ public class BrokerService implements Se } if (networkConnectorStartExecutor != null) { // executor done when enqueued tasks are complete - networkConnectorStartExecutor.shutdown(); - networkConnectorStartExecutor = null; + ThreadPoolUtils.shutdown(networkConnectorStartExecutor); } for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { @@ -2755,7 +2755,7 @@ public class BrokerService implements Se /** * Sets whether Authenticated User Name information is shown in MBeans that support this field. - * @param true if MBeans should expose user name information. + * @param value if MBeans should expose user name information. */ public void setPopulateUserNameInMBeans(boolean value) { this.populateUserNameInMBeans = value; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Sep 6 17:50:43 2012 @@ -86,6 +86,7 @@ import org.apache.activemq.transaction.S import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; import org.apache.activemq.util.BrokerSupport; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -883,7 +884,8 @@ public class Queue extends BaseDestinati taskRunner.shutdown(); } if (this.executor != null) { - this.executor.shutdownNow(); + ThreadPoolUtils.shutdownNow(executor); + executor = null; } scheduler.cancel(expireMessagesTask); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Thu Sep 6 17:50:43 2012 @@ -70,7 +70,6 @@ import org.apache.activemq.command.Shutd import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.MessageEvaluationContext; -import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.ResponseCallback; @@ -92,7 +91,6 @@ import org.slf4j.LoggerFactory; */ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); - private TaskRunnerFactory asyncTaskRunner; protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; protected final Transport localBroker; protected final Transport remoteBroker; @@ -156,8 +154,10 @@ public abstract class DemandForwardingBr public void start() throws Exception { if (started.compareAndSet(false, true)) { - asyncTaskRunner = new TaskRunnerFactory("ActiveMQ ForwardingBridge Task"); - asyncTaskRunner.init(); + + if (brokerService == null) { + throw new IllegalArgumentException("BrokerService is null on " + this); + } localBroker.setTransportListener(new DefaultTransportListener() { @@ -201,7 +201,7 @@ public abstract class DemandForwardingBr } protected void triggerLocalStartBridge() throws IOException { - asyncTaskRunner.execute(new Runnable() { + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { final String originalName = Thread.currentThread().getName(); Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); @@ -217,7 +217,7 @@ public abstract class DemandForwardingBr } protected void triggerRemoteStartBridge() throws IOException { - asyncTaskRunner.execute(new Runnable() { + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { final String originalName = Thread.currentThread().getName(); Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker); @@ -350,7 +350,8 @@ public abstract class DemandForwardingBr try { remoteBridgeStarted.set(false); final CountDownLatch sendShutdown = new CountDownLatch(1); - asyncTaskRunner.execute(new Runnable() { + + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { try { localBroker.oneway(new ShutdownInfo()); @@ -363,7 +364,8 @@ public abstract class DemandForwardingBr } } - }); + }, "ActiveMQ ForwardingBridge StopTask"); + if (!sendShutdown.await(10, TimeUnit.SECONDS)) { LOG.info("Network Could not shutdown in a timely manner"); } @@ -377,9 +379,6 @@ public abstract class DemandForwardingBr startedLatch.countDown(); localStartedLatch.countDown(); - // stop task runner - asyncTaskRunner.shutdown(); - asyncTaskRunner = null; ss.throwFirstException(); } } @@ -399,7 +398,7 @@ public abstract class DemandForwardingBr LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); } LOG.debug("The remote Exception was: " + error, error); - asyncTaskRunner.execute(new Runnable() { + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { ServiceSupport.dispose(getControllingService()); } @@ -632,7 +631,7 @@ public abstract class DemandForwardingBr if (!disposed.get()) { LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); LOG.debug("The local Exception was:" + error, error); - asyncTaskRunner.execute(new Runnable() { + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { ServiceSupport.dispose(getControllingService()); } @@ -660,7 +659,7 @@ public abstract class DemandForwardingBr subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); // continue removal in separate thread to free up this thread for outstanding responses - asyncTaskRunner.execute(new Runnable() { + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { sub.waitForCompletion(); try { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java Thu Sep 6 17:50:43 2012 @@ -35,6 +35,7 @@ import org.apache.activemq.ActiveMQConne import org.apache.activemq.Service; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.util.LRUCache; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jndi.JndiTemplate; @@ -166,7 +167,8 @@ public abstract class JmsConnector imple public void stop() throws Exception { if (started.compareAndSet(true, false)) { - this.connectionSerivce.shutdown(); + ThreadPoolUtils.shutdown(connectionSerivce); + connectionSerivce = null; for (DestinationBridge bridge : inboundBridges) { bridge.stop(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu Sep 6 17:50:43 2012 @@ -69,6 +69,7 @@ import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -287,7 +288,8 @@ public class JournalPersistenceAdapter i // Take one final checkpoint and stop checkpoint processing. checkpoint(true, true); checkpointTask.shutdown(); - checkpointExecutor.shutdown(); + ThreadPoolUtils.shutdown(checkpointExecutor); + checkpointExecutor = null; queues.clear(); topics.clear(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Sep 6 17:50:43 2012 @@ -64,6 +64,7 @@ import org.apache.activemq.store.kahadb. import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -237,10 +238,12 @@ public class KahaDBStore extends Message this.globalTopicSemaphore.drainPermits(); } if (this.queueExecutor != null) { - this.queueExecutor.shutdownNow(); + ThreadPoolUtils.shutdownNow(queueExecutor); + queueExecutor = null; } if (this.topicExecutor != null) { - this.topicExecutor.shutdownNow(); + ThreadPoolUtils.shutdownNow(topicExecutor); + topicExecutor = null; } LOG.info("Stopped KahaDB"); super.doStop(stopper); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Thu Sep 6 17:50:43 2012 @@ -92,9 +92,40 @@ public class TaskRunnerFactory implement } } + /** + * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively. + * + * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService) + */ public void shutdown() { if (executor != null) { - ThreadPoolUtils.shutdown(executor, shutdownAwaitTermination); + ThreadPoolUtils.shutdown(executor); + executor = null; + } + initDone.set(false); + } + + /** + * Performs a shutdown now (aggressively) on the thread pool. + * + * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService) + */ + public void shutdownNow() { + if (executor != null) { + ThreadPoolUtils.shutdownNow(executor); + executor = null; + } + initDone.set(false); + } + + /** + * Performs a graceful shutdown. + * + * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService) + */ + public void shutdownGraceful() { + if (executor != null) { + ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination); executor = null; } initDone.set(false); @@ -119,10 +150,19 @@ public class TaskRunnerFactory implement if (executor != null) { executor.execute(runnable); } else { - new Thread(runnable, name + "-" + id.incrementAndGet()).start(); + doExecuteNewThread(runnable, name); } } + private void doExecuteNewThread(Runnable runnable, String name) { + String threadName = name + "-" + id.incrementAndGet(); + Thread thread = new Thread(runnable, threadName); + thread.setDaemon(daemon); + + LOG.trace("Created and running thread[{}]: {}", threadName, thread); + thread.start(); + } + protected ExecutorService createDefaultExecutor() { ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java Thu Sep 6 17:50:43 2012 @@ -29,6 +29,7 @@ import java.util.concurrent.locks.Reentr import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.thread.SchedulerTimerTask; +import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -364,7 +365,7 @@ public abstract class AbstractInactivity READ_CHECK_TIMER.cancel(); WRITE_CHECK_TIMER = null; READ_CHECK_TIMER = null; - ASYNC_TASKS.shutdown(); + ThreadPoolUtils.shutdown(ASYNC_TASKS); ASYNC_TASKS = null; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java Thu Sep 6 17:50:43 2012 @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryListener; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -348,7 +349,10 @@ public class MulticastDiscoveryAgent imp if (runner != null) { runner.interrupt(); } - getExecutor().shutdownNow(); + if (executor != null) { + ThreadPoolUtils.shutdownNow(executor); + executor = null; + } } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java Thu Sep 6 17:50:43 2012 @@ -33,6 +33,7 @@ import org.apache.activemq.transport.Abs import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -276,7 +277,7 @@ public class MQTTInactivityMonitor exten if (CHECKER_COUNTER == 0) { READ_CHECK_TIMER.cancel(); READ_CHECK_TIMER = null; - ASYNC_TASKS.shutdown(); + ThreadPoolUtils.shutdown(ASYNC_TASKS); ASYNC_TASKS = null; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java?rev=1381695&r1=1381694&r2=1381695&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java Thu Sep 6 17:50:43 2012 @@ -24,27 +24,53 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * Utility methods for working with thread pools {@link ExecutorService}. */ -public class ThreadPoolUtils { +public final class ThreadPoolUtils { private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class); - // TODO: Should be 30 sec - // but lowered due some unit tests dont yet properly shutdown, so want to run these a bit faster - public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L; + public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 30 * 1000L; + + /** + * Shutdown the given executor service only (ie not graceful shutdown). + * + * @see java.util.concurrent.ExecutorService#shutdown() + */ + public static void shutdown(ExecutorService executorService) { + doShutdown(executorService, -1, true); + } + + /** + * Shutdown now the given executor service aggressively. + * + * @param executorService the executor service to shutdown now + * @return list of tasks that never commenced execution + * @see java.util.concurrent.ExecutorService#shutdownNow() + */ + public static List shutdownNow(ExecutorService executorService) { + List answer = null; + if (!executorService.isShutdown()) { + LOG.debug("Forcing shutdown of ExecutorService: {}", executorService); + answer = executorService.shutdownNow(); + if (LOG.isTraceEnabled()) { + LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.", + new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()}); + } + } + + return answer; + } /** * Shutdown the given executor service graceful at first, and then aggressively * if the await termination timeout was hit. *

- * This implementation invokes the {@link #shutdown(java.util.concurrent.ExecutorService, long)} + * This implementation invokes the {@link #shutdownGraceful(java.util.concurrent.ExecutorService, long)} * with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis. - * - * @see #shutdown(java.util.concurrent.ExecutorService, long) */ - public void shutdown(ExecutorService executorService) { - shutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION); + public static void shutdownGraceful(ExecutorService executorService) { + doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION, false); } /** @@ -57,14 +83,35 @@ public class ThreadPoolUtils { * forces a shutdown. The parameter shutdownAwaitTermination * is used as timeout value waiting for orderly shutdown to * complete normally, before going aggressively. + *

+ * Notice if the given parameter shutdownAwaitTermination is negative, then a quick shutdown + * is commenced, by invoking the {@link java.util.concurrent.ExecutorService#shutdown()} method + * and then exit from this method (ie. no graceful shutdown is performed). * * @param executorService the executor service to shutdown - * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown - * @see java.util.concurrent.ExecutorService#shutdown() + * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown, if the value if negative + * then the thread pool is not graceful shutdown, but a regular shutdown + * is commenced. */ - public static void shutdown(ExecutorService executorService, long shutdownAwaitTermination) { + public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) { + doShutdown(executorService, shutdownAwaitTermination, false); + } + + private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean quick) { // code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager + if (executorService == null) { + return; + } + + if (quick) { + // do not shutdown graceful, but just quick shutdown on the thread pool + executorService.shutdown(); + LOG.debug("Quick shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.", + new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()}); + return; + } + if (shutdownAwaitTermination <= 0) { throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination); } @@ -106,27 +153,6 @@ public class ThreadPoolUtils { } /** - * Shutdown now the given executor service aggressively. - * - * @param executorService the executor service to shutdown now - * @return list of tasks that never commenced execution - * @see java.util.concurrent.ExecutorService#shutdownNow() - */ - public static List shutdownNow(ExecutorService executorService) { - List answer = null; - if (!executorService.isShutdown()) { - LOG.debug("Forcing shutdown of ExecutorService: {}", executorService); - answer = executorService.shutdownNow(); - if (LOG.isTraceEnabled()) { - LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.", - new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()}); - } - } - - return answer; - } - - /** * Awaits the termination of the thread pool. *

* This implementation will log every 5th second at INFO level that we are waiting, so the end user