From commits-return-12266-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Fri Nov 13 15:22:32 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 3390 invoked from network); 13 Nov 2009 15:22:32 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 13 Nov 2009 15:22:32 -0000 Received: (qmail 40830 invoked by uid 500); 13 Nov 2009 15:22:32 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 40806 invoked by uid 500); 13 Nov 2009 15:22:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 40797 invoked by uid 99); 13 Nov 2009 15:22:32 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Nov 2009 15:22:32 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=AWL,BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Nov 2009 15:22:28 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B078023888D4; Fri, 13 Nov 2009 15:22:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r835874 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/store/amq/ main/java/org/apache/activemq/store/j... Date: Fri, 13 Nov 2009 15:22:08 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091113152208.B078023888D4@eris.apache.org> Author: gtully Date: Fri Nov 13 15:22:07 2009 New Revision: 835874 URL: http://svn.apache.org/viewvc?rev=835874&view=rev Log: resolve https://issues.apache.org/activemq/browse/AMQ-2483 and https://issues.apache.org/activemq/browse/AMQ-2028, keep track of outstanding wakeup requests in a queue with regular task runner avoids build up in determintic task runner. Exposed useDeterministicTaskRunner to validate some test that fail with the -DuseDedicatedTaskRunner=true system property. With broker.useDedicatedTask=false, Queues will use pooled executor for dispatch. Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Fri Nov 13 15:22:07 2009 @@ -114,7 +114,7 @@ protected boolean dispatchAsync=true; protected boolean alwaysSessionAsync = true; - private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000); + private TaskRunnerFactory sessionTaskRunner; private final ThreadPoolExecutor asyncConnectionThread; // Connection state variables @@ -186,6 +186,7 @@ private ConnectionAudit connectionAudit = new ConnectionAudit(); private DestinationSource destinationSource; private final Object ensureConnectionInfoSentMutex = new Object(); + private boolean useDedicatedTaskRunner; /** * Construct an ActiveMQConnection @@ -644,7 +645,9 @@ // factory // then we may need to call // factory.onConnectionClose(this); - sessionTaskRunner.shutdown(); + if (sessionTaskRunner != null) { + sessionTaskRunner.shutdown(); + } closed.set(true); closing.set(false); } @@ -927,7 +930,20 @@ transportListeners.remove(transportListener); } + public boolean isUseDedicatedTaskRunner() { + return useDedicatedTaskRunner; + } + + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { + this.useDedicatedTaskRunner = useDedicatedTaskRunner; + } + public TaskRunnerFactory getSessionTaskRunner() { + synchronized (this) { + if (sessionTaskRunner == null) { + sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner()); + } + } return sessionTaskRunner; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Fri Nov 13 15:22:07 2009 @@ -107,12 +107,13 @@ private boolean watchTopicAdvisories = true; private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; private long warnAboutUnstartedConnectionTimeout = 500L; - private int sendTimeout =0; + private int sendTimeout = 0; private boolean sendAcksAsync=true; private TransportListener transportListener; - private ExceptionListener exceptionListener; - private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; - private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; + private ExceptionListener exceptionListener; + private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; + private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; + private boolean useDedicatedTaskRunner; // ///////////////////////////////////////////// // @@ -313,6 +314,7 @@ connection.setSendAcksAsync(isSendAcksAsync()); connection.setAuditDepth(getAuditDepth()); connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); + connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); if (transportListener != null) { connection.addTransportListener(transportListener); } @@ -903,4 +905,12 @@ public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { this.auditMaximumProducerNumber = auditMaximumProducerNumber; } + + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { + this.useDedicatedTaskRunner = useDedicatedTaskRunner; + } + + public boolean isUseDedicatedTaskRunner() { + return useDedicatedTaskRunner; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java Fri Nov 13 15:22:07 2009 @@ -91,6 +91,10 @@ if (taskRunner == null) { synchronized (this) { if (this.taskRunner == null) { + if (!isRunning()) { + // stop has been called + return; + } this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: " + session.getSessionId()); } @@ -142,11 +146,12 @@ void stop() throws JMSException { try { if (messageQueue.isRunning()) { - messageQueue.stop(); - TaskRunner taskRunner = this.taskRunner; - if (taskRunner != null) { - this.taskRunner = null; - taskRunner.shutdown(); + synchronized(this) { + messageQueue.stop(); + if (this.taskRunner != null) { + this.taskRunner.shutdown(); + this.taskRunner = null; + } } } } catch (InterruptedException e) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Nov 13 15:22:07 2009 @@ -933,7 +933,7 @@ public TaskRunnerFactory getPersistenceTaskRunnerFactory() { if (taskRunnerFactory == null) { persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, - true, 1000); + true, 1000, isDedicatedTaskRunner()); } return persistenceTaskRunnerFactory; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Nov 13 15:22:07 2009 @@ -899,21 +899,26 @@ for (TransportConnectionState cs : connectionStates) { cs.getContext().getStopping().set(true); } - new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) { - @Override - public void run() { - serviceLock.writeLock().lock(); - try { - doStop(); - } catch (Throwable e) { - LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress() - + "': ", e); - } finally { - stopped.countDown(); - serviceLock.writeLock().unlock(); + try { + new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) { + @Override + public void run() { + serviceLock.writeLock().lock(); + try { + doStop(); + } catch (Throwable e) { + LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress() + + "': ", e); + } finally { + stopped.countDown(); + serviceLock.writeLock().unlock(); + } } - } - }.start(); + }.start(); + } catch (Throwable t) { + LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t); + stopped.countDown(); + } } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Nov 13 15:22:07 2009 @@ -31,9 +31,8 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -65,7 +64,6 @@ import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; -import org.apache.activemq.thread.DeterministicTaskRunner; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; @@ -86,7 +84,7 @@ */ public class Queue extends BaseDestination implements Task, UsageListener { protected static final Log LOG = LogFactory.getLog(Queue.class); - protected TaskRunnerFactory taskFactory; + protected final TaskRunnerFactory taskFactory; protected TaskRunner taskRunner; protected final List consumers = new ArrayList(50); protected PendingMessageCursor messages; @@ -108,9 +106,11 @@ private int timeBeforeDispatchStarts = 0; private int consumersBeforeDispatchStarts = 0; private CountDownLatch consumersBeforeStartsLatch; + private AtomicLong pendingWakeups = new AtomicLong(); + private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { public void run() { - wakeup(); + asyncWakeup(); } }; private final Runnable expireMessagesTask = new Runnable() { @@ -164,26 +164,13 @@ // since it turns into a shared blocking queue which can lead to a network deadlock. // If we are cursoring to disk..it's not and issue because it does not block due // to large disk sizes. - if( messages instanceof VMPendingMessageCursor ) { + if (messages instanceof VMPendingMessageCursor) { this.systemUsage = brokerService.getSystemUsage(); memoryUsage.setParent(systemUsage.getMemoryUsage()); } - if (isOptimizedDispatch()) { - this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName()); - }else { - final Queue queue = this; - this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new QueueThread(runnable, "QueueThread:"+destination, queue); - thread.setDaemon(true); - thread.setPriority(Thread.NORM_PRIORITY); - return thread; - } - }); - - this.taskRunner = new DeterministicTaskRunner(this.executor,this); - } + this.taskRunner = + taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName()); super.initialize(); if (store != null) { @@ -591,6 +578,7 @@ } }; doBrowse(browsedMessages, this.getMaxExpirePageSize()); + asyncWakeup(); } public void gc(){ @@ -1190,8 +1178,8 @@ } catch (Throwable e) { LOG.error("Failed to page in more queue messages ", e); } - } - return !messagesWaitingForSpace.isEmpty(); + } + return pendingWakeups.decrementAndGet() > 0; } } @@ -1297,7 +1285,6 @@ } catch (IOException e) { LOG.error("Failed to remove expired Message from the store ",e); } - asyncWakeup(); } protected ConnectionContext createConnectionContext() { @@ -1336,14 +1323,16 @@ public void wakeup() { if (optimizedDispatch || isSlave()) { iterate(); + pendingWakeups.incrementAndGet(); } else { asyncWakeup(); } } - - public void asyncWakeup() { + + private void asyncWakeup() { try { - this.taskRunner.wakeup(); + pendingWakeups.incrementAndGet(); + this.taskRunner.wakeup(); } catch (InterruptedException e) { LOG.warn("Async task tunner failed to wakeup ", e); } @@ -1432,7 +1421,7 @@ pagedInPendingDispatch.add(qmr); } } - doWakeUp = true; + doWakeUp = true; } } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Fri Nov 13 15:22:07 2009 @@ -55,6 +55,7 @@ private boolean recoverReferenceStore=true; private boolean forceRecoverReferenceStore=false; private long checkpointInterval = 1000 * 20; + private boolean useDedicatedTaskRunner; /** @@ -109,13 +110,21 @@ this.dataDirectory = dataDirectory; } + public boolean isUseDedicatedTaskRunner() { + return useDedicatedTaskRunner; + } + + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { + this.useDedicatedTaskRunner = useDedicatedTaskRunner; + } + /** * @return the taskRunnerFactory */ public TaskRunnerFactory getTaskRunnerFactory() { if (taskRunnerFactory == null) { taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, - true, 1000); + true, 1000, isUseDedicatedTaskRunner()); } return taskRunnerFactory; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java Fri Nov 13 15:22:07 2009 @@ -53,6 +53,7 @@ private boolean failIfJournalIsLocked; private int journalThreadPriority = Thread.MAX_PRIORITY; private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); + private boolean useDedicatedTaskRunner; public PersistenceAdapter createPersistenceAdapter() throws IOException { jdbcPersistenceAdapter.setDataSource(getDataSource()); @@ -110,10 +111,18 @@ this.useJournal = useJournal; } + public boolean isUseDedicatedTaskRunner() { + return useDedicatedTaskRunner; + } + + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { + this.useDedicatedTaskRunner = useDedicatedTaskRunner; + } + public TaskRunnerFactory getTaskRunnerFactory() { if (taskRunnerFactory == null) { taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, - true, 1000); + true, 1000, isUseDedicatedTaskRunner()); } return taskRunnerFactory; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Fri Nov 13 15:22:07 2009 @@ -43,7 +43,7 @@ this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000); } - public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { + private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { this(name,priority,daemon,maxIterationsPerRun,false); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java Fri Nov 13 15:22:07 2009 @@ -333,7 +333,9 @@ if (mcast != null) { mcast.close(); } - runner.interrupt(); + if (runner != null) { + runner.interrupt(); + } getExecutor().shutdownNow(); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Fri Nov 13 15:22:07 2009 @@ -43,7 +43,8 @@ private static final Object DISCONNECT = new Object(); private static final AtomicLong NEXT_ID = new AtomicLong(0); - private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000); + // still possible to configure dedicated task runner through system property but not programmatically + private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000, false); protected VMTransport peer; protected TransportListener transportListener; protected boolean disposed; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java Fri Nov 13 15:22:07 2009 @@ -55,7 +55,7 @@ @SuppressWarnings("serial") class Trace extends Throwable { - public int count; + public int count = 1; public final int size; Trace() { super(); Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java?rev=835874&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java Fri Nov 13 15:22:07 2009 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.Test; + +public class DedicatedTaskRunnerBrokerTest extends BrokerTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.setDedicatedTaskRunner(true); + return broker; + } + + public static Test suite() { + return suite(DedicatedTaskRunnerBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java ------------------------------------------------------------------------------ svn:executable = * Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Fri Nov 13 15:22:07 2009 @@ -47,7 +47,7 @@ import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.amq.AMQPersistenceAdapter; +import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -105,7 +105,7 @@ final DestinationStatistics destinationStatistics = new DestinationStatistics(); consumerInfo.setExclusive(true); final Queue queue = new Queue(brokerService, destination, - queueMessageStore, destinationStatistics, null); + queueMessageStore, destinationStatistics, brokerService.getTaskRunnerFactory()); // a workaround for this issue // queue.setUseCache(false); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=835874&r1=835873&r2=835874&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java Fri Nov 13 15:22:07 2009 @@ -42,13 +42,15 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; -import junit.framework.TestCase; +import junit.framework.Test; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; +import org.apache.activemq.util.ThreadTracker; import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,7 +59,7 @@ * @version $Revision: 1.5 $ * A Test case for AMQ-1479 */ -public class DurableConsumerTest extends TestCase { +public class DurableConsumerTest extends CombinationTestSupport { private static final Log LOG = LogFactory.getLog(DurableConsumerTest.class); private static int COUNT = 1024*10; private static String CONSUMER_NAME = "DURABLE_TEST"; @@ -71,8 +73,8 @@ private static final String TOPIC_NAME = "failoverTopic"; private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)"; - - + public boolean useDedicatedTaskRunner = false; + private class SimpleTopicSubscriber implements MessageListener, ExceptionListener { private TopicConnection topicConnection = null; @@ -176,8 +178,7 @@ final int id = i; Thread thread = new Thread( new Runnable() { public void run() { - - SimpleTopicSubscriber sub = new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id, TOPIC_NAME); + new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id, TOPIC_NAME); } } ); thread.start(); @@ -192,7 +193,13 @@ Thread.sleep(10000); assertEquals(0, exceptions.size()); } - + + // makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028 + // with use dedicatedTaskRunner=true and produce OOM + public void initCombosForTestConcurrentDurableConsumer() { + addCombinationValues("useDedicatedTaskRunner", new Object[] {Boolean.TRUE, Boolean.FALSE}); + } + public void testConcurrentDurableConsumer() throws Exception { broker.start(); @@ -247,7 +254,7 @@ } }; - ExecutorService executor = Executors.newCachedThreadPool(); + ExecutorService executor = Executors.newFixedThreadPool(numConsumers); for (int i=0; i= initialReconnectDelay); + assertTrue("took at least initialReconnectDelay time: " + duration + " e:" + expected, duration >= initialReconnectDelay); } } }