Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 46257 invoked from network); 15 Dec 2010 12:45:19 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 15 Dec 2010 12:45:19 -0000 Received: (qmail 70937 invoked by uid 500); 15 Dec 2010 12:45:19 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 70867 invoked by uid 500); 15 Dec 2010 12:45:19 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 70859 invoked by uid 99); 15 Dec 2010 12:45:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Dec 2010 12:45:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Dec 2010 12:45:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 37C2A2388903; Wed, 15 Dec 2010 12:44:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1049527 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ main/java/org/apache/activemq/transport/discovery/simple/ main/java/org/apache/activemq/transport/vm/ test/java/org/apache/activemq/transport/vm/ Date: Wed, 15 Dec 2010 12:44:48 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101215124448.37C2A2388903@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Wed Dec 15 12:44:47 2010 New Revision: 1049527 URL: http://svn.apache.org/viewvc?rev=1049527&view=rev Log: additional broker side improvements for https://issues.apache.org/jira/browse/AMQ-2852 - have discovery and network connector and vm async tasks delegate to the the default thread pool executor, serialized the test to ensure shutdown is called once after verification Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1049527&r1=1049526&r2=1049527&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Dec 15 12:44:47 2010 @@ -25,9 +25,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -68,6 +65,8 @@ import org.apache.activemq.command.Shutd import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.thread.DefaultThreadPools; +import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.ResponseCallback; @@ -92,7 +91,7 @@ import org.apache.commons.logging.LogFac */ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { private static final Log LOG = LogFactory.getLog(DemandForwardingBridgeSupport.class); - private static final ThreadPoolExecutor ASYNC_TASKS; + private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory(); protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; protected final Transport localBroker; protected final Transport remoteBroker; @@ -251,7 +250,7 @@ public abstract class DemandForwardingBr } protected void triggerLocalStartBridge() throws IOException { - ASYNC_TASKS.execute(new Runnable() { + asyncTaskRunner.execute(new Runnable() { public void run() { final String originalName = Thread.currentThread().getName(); Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); @@ -267,7 +266,7 @@ public abstract class DemandForwardingBr } protected void triggerRemoteStartBridge() throws IOException { - ASYNC_TASKS.execute(new Runnable() { + asyncTaskRunner.execute(new Runnable() { public void run() { final String originalName = Thread.currentThread().getName(); Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker); @@ -391,7 +390,7 @@ public abstract class DemandForwardingBr try { remoteBridgeStarted.set(false); final CountDownLatch sendShutdown = new CountDownLatch(1); - ASYNC_TASKS.execute(new Runnable() { + asyncTaskRunner.execute(new Runnable() { public void run() { try { localBroker.oneway(new ShutdownInfo()); @@ -433,7 +432,7 @@ public abstract class DemandForwardingBr LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); } LOG.debug("The remote Exception was: " + error, error); - ASYNC_TASKS.execute(new Runnable() { + asyncTaskRunner.execute(new Runnable() { public void run() { ServiceSupport.dispose(getControllingService()); } @@ -647,7 +646,7 @@ public abstract class DemandForwardingBr if (!disposed.get()) { LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); LOG.debug("The local Exception was:" + error, error); - ASYNC_TASKS.execute(new Runnable() { + asyncTaskRunner.execute(new Runnable() { public void run() { ServiceSupport.dispose(getControllingService()); } @@ -674,7 +673,7 @@ public abstract class DemandForwardingBr subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); // continue removal in separate thread to free up this thread for outstanding responses - ASYNC_TASKS.execute(new Runnable() { + asyncTaskRunner.execute(new Runnable() { public void run() { sub.waitForCompletion(); try { @@ -1277,15 +1276,4 @@ public abstract class DemandForwardingBr public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; } - - static { - ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "NetworkBridge"); - thread.setDaemon(true); - return thread; - } - }); - } - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=1049527&r1=1049526&r2=1049527&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java Wed Dec 15 12:44:47 2010 @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.DiscoveryEvent; +import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryListener; import org.apache.commons.logging.Log; @@ -38,8 +39,7 @@ import org.apache.commons.logging.LogFac */ public class SimpleDiscoveryAgent implements DiscoveryAgent { - private final static Log LOG = LogFactory.getLog(SimpleDiscoveryAgent.class); - private static final ThreadPoolExecutor ASYNC_TASKS; + private final static Log LOG = LogFactory.getLog(SimpleDiscoveryAgent.class); private long initialReconnectDelay = 1000; private long maxReconnectDelay = 1000 * 30; private long backOffMultiplier = 2; @@ -110,14 +110,14 @@ public class SimpleDiscoveryAgent implem if (event.failed.compareAndSet(false, true)) { listener.onServiceRemove(event); - ASYNC_TASKS.execute(new Runnable() { + DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { public void run() { // We detect a failed connection attempt because the service // fails right // away. if (event.connectTime + minConnectTime > System.currentTimeMillis()) { - LOG.debug("Failure occured soon after the discovery event was generated. It will be clasified as a connection failure: "+event); + LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: "+event); event.connectFailures++; @@ -132,7 +132,7 @@ public class SimpleDiscoveryAgent implem return; } - LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect."); + LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect."); sleepMutex.wait(event.reconnectDelay); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -163,7 +163,7 @@ public class SimpleDiscoveryAgent implem event.failed.set(false); listener.onServiceAdd(event); } - }); + }, "Simple Discovery Agent"); } } @@ -214,16 +214,4 @@ public class SimpleDiscoveryAgent implem public void setUseExponentialBackOff(boolean useExponentialBackOff) { this.useExponentialBackOff = useExponentialBackOff; } - - static { - ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Simple Discovery Agent: "+runnable); - thread.setDaemon(true); - return thread; - } - }); - } - - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=1049527&r1=1049526&r2=1049527&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Wed Dec 15 12:44:47 2010 @@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlocki import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; @@ -44,8 +45,6 @@ public class VMTransport implements Tran private static final Object DISCONNECT = new Object(); private static final AtomicLong NEXT_ID = new AtomicLong(0); - // still possible to configure dedicated task runner through system property but not programmatically - private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000, false); protected VMTransport peer; protected TransportListener transportListener; protected boolean disposed; @@ -331,7 +330,7 @@ public class VMTransport implements Tran if (async) { synchronized (lazyInitMutext) { if (taskRunner == null) { - taskRunner = TASK_RUNNER_FACTORY.createTaskRunner(this, "VMTransport: " + toString()); + taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString()); } } try { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java?rev=1049527&r1=1049526&r2=1049527&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java Wed Dec 15 12:44:47 2010 @@ -27,8 +27,8 @@ import org.apache.activemq.ActiveMQConne import org.apache.activemq.broker.BrokerService; import org.apache.activemq.bugs.embedded.ThreadExplorer; import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.thread.DefaultThreadPools; -import static org.apache.activemq.thread.DefaultThreadPools.shutdown; public class VmTransportNetworkBrokerTest extends TestCase { @@ -38,8 +38,10 @@ public class VmTransportNetworkBrokerTes CountDownLatch started = new CountDownLatch(1); CountDownLatch gotConnection = new CountDownLatch(1); - public void testNoThreadLeakWithActiveVMConnection() throws Exception { - + public void testNoThreadLeak() throws Exception { + + // with VMConnection and simple discovery network connector + int originalThreadCount = Thread.activeCount(); BrokerService broker = new BrokerService(); broker.setDedicatedTaskRunner(true); broker.setPersistent(false); @@ -55,43 +57,42 @@ public class VmTransportNetworkBrokerTes // let it settle TimeUnit.SECONDS.sleep(5); - int threadCount = Thread.activeCount(); + int threadCountAfterStart = Thread.activeCount(); TimeUnit.SECONDS.sleep(30); int threadCountAfterSleep = Thread.activeCount(); - assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCount + " threadCountAfterSleep=" + threadCountAfterSleep, - threadCountAfterSleep < threadCount + 8); + assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" +threadCountAfterStart + " threadCountAfterSleep=" + threadCountAfterSleep, + threadCountAfterSleep < threadCountAfterStart + 8); connection.close(); broker.stop(); broker.waitUntilStopped(); - } - public void testNoDanglingThreadsAfterStop() throws Exception { + // testNoDanglingThreadsAfterStop with tcp transport - int threadCount = Thread.activeCount(); - BrokerService broker = new BrokerService(); + broker = new BrokerService(); broker.setSchedulerSupport(true); broker.setDedicatedTaskRunner(true); broker.setPersistent(false); broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000"); broker.start(); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000"); - Connection connection = cf.createConnection("system", "manager"); + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000"); + connection = cf.createConnection("system", "manager"); connection.start(); connection.close(); broker.stop(); broker.waitUntilStopped(); - shutdown(); + + // must only be called when all brokers and connections are done! + DefaultThreadPools.shutdown(); // let it settle TimeUnit.SECONDS.sleep(5); int threadCountAfterStop = Thread.activeCount(); - assertTrue("Threads are leaking: " + ThreadExplorer.show("active after stop") + ". threadCount=" + threadCount + " threadCountAfterStop=" + threadCountAfterStop, - threadCountAfterStop == threadCount); + assertTrue("Threads are leaking: " + ThreadExplorer.show("active after stop") + ". originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop, + threadCountAfterStop == originalThreadCount); } - }