activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r835921 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/store/amq/ main/java/org/apache/...
Date Fri, 13 Nov 2009 17:04:20 GMT
Author: gtully
Date: Fri Nov 13 17:04:09 2009
New Revision: 835921

URL: http://svn.apache.org/viewvc?rev=835921&view=rev
Log:
merge -c 835874 - resolve https://issues.apache.org/activemq/browse/AMQ-2483 and https://issues.apache.org/activemq/browse/AMQ-2028,
keep track of outstanding wakeup requests in a queue with regular task runner avoids build
up in determintic task runner. Exposed useDeterministicTaskRunner to validate some test that
fail with the -DuseDedicatedTaskRunner=true system property. With broker.useDedicatedTask=false,
Queues will use pooled executor for dispatch.

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java
      - copied unchanged from r835874, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Fri Nov 13 17:04:09 2009
@@ -114,7 +114,7 @@
     protected boolean dispatchAsync=true;
     protected boolean alwaysSessionAsync = true;
 
-    private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session
Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000);
+    private TaskRunnerFactory sessionTaskRunner;
     private final ThreadPoolExecutor asyncConnectionThread;
 
     // Connection state variables
@@ -186,6 +186,7 @@
     private ConnectionAudit connectionAudit = new ConnectionAudit();
     private DestinationSource destinationSource;
     private final Object ensureConnectionInfoSentMutex = new Object();
+    private boolean useDedicatedTaskRunner;
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -644,7 +645,9 @@
                     // factory
                     // then we may need to call
                     // factory.onConnectionClose(this);
-                    sessionTaskRunner.shutdown();
+                    if (sessionTaskRunner != null) {
+                        sessionTaskRunner.shutdown();
+                    }
                     closed.set(true);
                     closing.set(false);
                 }
@@ -927,7 +930,20 @@
         transportListeners.remove(transportListener);
     }
 
+    public boolean isUseDedicatedTaskRunner() {
+        return useDedicatedTaskRunner;
+    }
+    
+    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
+        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
+    }
+
     public TaskRunnerFactory getSessionTaskRunner() {
+        synchronized (this) {
+            if (sessionTaskRunner == null) {
+                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION,
false, 1000, isUseDedicatedTaskRunner());
+            }
+        }
         return sessionTaskRunner;
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Fri Nov 13 17:04:09 2009
@@ -107,12 +107,13 @@
     private boolean watchTopicAdvisories = true;
     private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
     private long warnAboutUnstartedConnectionTimeout = 500L;
-    private int sendTimeout =0;
+    private int sendTimeout = 0;
     private boolean sendAcksAsync=true;
     private TransportListener transportListener;
-	private ExceptionListener exceptionListener;
-	private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
-	private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
+    private ExceptionListener exceptionListener;
+    private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
+    private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
+    private boolean useDedicatedTaskRunner;
 
     // /////////////////////////////////////////////
     //
@@ -313,6 +314,7 @@
         connection.setSendAcksAsync(isSendAcksAsync());
         connection.setAuditDepth(getAuditDepth());
         connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
+        connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
         if (transportListener != null) {
             connection.addTransportListener(transportListener);
         }
@@ -903,4 +905,12 @@
 	public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
 		this.auditMaximumProducerNumber = auditMaximumProducerNumber;
 	}
+
+    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
+        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
+    }
+    
+    public boolean isUseDedicatedTaskRunner() {
+        return useDedicatedTaskRunner;
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Fri Nov 13 17:04:09 2009
@@ -91,6 +91,10 @@
                     if (taskRunner == null) {
                         synchronized (this) {
                             if (this.taskRunner == null) {
+                                if (!isRunning()) {
+                                    // stop has been called
+                                    return;
+                                }
                                 this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
                                         "ActiveMQ Session: " + session.getSessionId());
                             }
@@ -142,11 +146,12 @@
     void stop() throws JMSException {
         try {
             if (messageQueue.isRunning()) {
-                messageQueue.stop();
-                TaskRunner taskRunner = this.taskRunner;
-                if (taskRunner != null) {
-                    this.taskRunner = null;
-                    taskRunner.shutdown();
+                synchronized(this) {
+                    messageQueue.stop();
+                    if (this.taskRunner != null) {
+                        this.taskRunner.shutdown();
+                        this.taskRunner = null;
+                    }
                 }
             }
         } catch (InterruptedException e) {

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Nov 13 17:04:09 2009
@@ -933,7 +933,7 @@
     public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
         if (taskRunnerFactory == null) {
             persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task",
persistenceThreadPriority,
-                    true, 1000);
+                    true, 1000, isDedicatedTaskRunner());
         }
         return persistenceTaskRunnerFactory;
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Nov 13 17:04:09 2009
@@ -899,21 +899,26 @@
             for (TransportConnectionState cs : connectionStates) {
                 cs.getContext().getStopping().set(true);
             }
-            new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) {
-                @Override
-                public void run() {
-                    serviceLock.writeLock().lock();
-                    try {
-                        doStop();
-                    } catch (Throwable e) {
-                        LOG.debug("Error occured while shutting down a connection to '" +
transport.getRemoteAddress()
-                                + "': ", e);
-                    } finally {
-                        stopped.countDown();
-                        serviceLock.writeLock().unlock();
+            try {
+                new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress())
{
+                    @Override
+                    public void run() {
+                        serviceLock.writeLock().lock();
+                        try {
+                            doStop();
+                        } catch (Throwable e) {
+                            LOG.debug("Error occured while shutting down a connection to
'" + transport.getRemoteAddress()
+                                    + "': ", e);
+                        } finally {
+                            stopped.countDown();
+                            serviceLock.writeLock().unlock();
+                        }
                     }
-                }
-            }.start();
+                }.start();
+            } catch (Throwable t) {
+                LOG.warn("cannot create async transport stopper thread.. not waiting for
stop to complete, reason:", t);
+                stopped.countDown();
+            }
         }
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Nov 13 17:04:09 2009
@@ -31,9 +31,8 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -65,7 +64,6 @@
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.thread.DeterministicTaskRunner;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
@@ -86,7 +84,7 @@
  */
 public class Queue extends BaseDestination implements Task, UsageListener {
     protected static final Log LOG = LogFactory.getLog(Queue.class);
-    protected TaskRunnerFactory taskFactory;
+    protected final TaskRunnerFactory taskFactory;
     protected TaskRunner taskRunner;    
     protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
     protected PendingMessageCursor messages;
@@ -108,9 +106,11 @@
     private int timeBeforeDispatchStarts = 0;
     private int consumersBeforeDispatchStarts = 0;
     private CountDownLatch consumersBeforeStartsLatch;
+    private AtomicLong pendingWakeups = new AtomicLong();
+    
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
-            wakeup();
+            asyncWakeup();
         }
     };
     private final Runnable expireMessagesTask = new Runnable() {
@@ -164,26 +164,13 @@
         // since it turns into a shared blocking queue which can lead to a network deadlock.
 
         // If we are cursoring to disk..it's not and issue because it does not block due

         // to large disk sizes.
-        if( messages instanceof VMPendingMessageCursor ) {
+        if (messages instanceof VMPendingMessageCursor) {
             this.systemUsage = brokerService.getSystemUsage();
             memoryUsage.setParent(systemUsage.getMemoryUsage());
         }
         
-        if (isOptimizedDispatch()) {
-            this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue:  " + destination.getPhysicalName());
-        }else {
-            final Queue queue = this;
-            this.executor =  Executors.newSingleThreadExecutor(new ThreadFactory() {
-                public Thread newThread(Runnable runnable) {
-                    Thread thread = new QueueThread(runnable, "QueueThread:"+destination,
queue);
-                    thread.setDaemon(true);
-                    thread.setPriority(Thread.NORM_PRIORITY);
-                    return thread;
-                }
-            });
-               
-            this.taskRunner = new DeterministicTaskRunner(this.executor,this);
-        }
+        this.taskRunner =
+            taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
         
         super.initialize();
         if (store != null) {
@@ -591,6 +578,7 @@
             }
         };
         doBrowse(browsedMessages, this.getMaxExpirePageSize());
+        asyncWakeup();
     }
 
     public void gc(){
@@ -1190,8 +1178,8 @@
 	            } catch (Throwable e) {
 	                LOG.error("Failed to page in more queue messages ", e);
                 }
-	        }
-	        return !messagesWaitingForSpace.isEmpty();
+	        }        
+	        return pendingWakeups.decrementAndGet() > 0;
         }
     }
 
@@ -1297,7 +1285,6 @@
         } catch (IOException e) {
             LOG.error("Failed to remove expired Message from the store ",e);
         }
-        asyncWakeup();
     }
     
     protected ConnectionContext createConnectionContext() {
@@ -1336,14 +1323,16 @@
     public void wakeup() {
         if (optimizedDispatch || isSlave()) {
             iterate();
+            pendingWakeups.incrementAndGet();
         } else {
             asyncWakeup();
         }
     }
-    
-    public void asyncWakeup() {
+
+    private void asyncWakeup() {
         try {
-            this.taskRunner.wakeup();
+            pendingWakeups.incrementAndGet();
+            this.taskRunner.wakeup();    
         } catch (InterruptedException e) {
             LOG.warn("Async task tunner failed to wakeup ", e);
         }
@@ -1432,7 +1421,7 @@
                                 pagedInPendingDispatch.add(qmr);
                             }
                         }
-                        doWakeUp  = true;
+                        doWakeUp = true;
                     }
                 }
             }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
Fri Nov 13 17:04:09 2009
@@ -55,6 +55,7 @@
     private boolean recoverReferenceStore=true;
     private boolean forceRecoverReferenceStore=false;
     private long checkpointInterval = 1000 * 20;
+    private boolean useDedicatedTaskRunner;
 
 
     /**
@@ -109,13 +110,21 @@
         this.dataDirectory = dataDirectory;
     }
 
+    public boolean isUseDedicatedTaskRunner() {
+        return useDedicatedTaskRunner;
+    }
+    
+    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
+        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
+    }
+    
     /**
      * @return the taskRunnerFactory
      */
     public TaskRunnerFactory getTaskRunnerFactory() {
         if (taskRunnerFactory == null) {
             taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority,
-                                                      true, 1000);
+                                                      true, 1000, isUseDedicatedTaskRunner());
         }
         return taskRunnerFactory;
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
Fri Nov 13 17:04:09 2009
@@ -53,6 +53,7 @@
     private boolean failIfJournalIsLocked;
     private int journalThreadPriority = Thread.MAX_PRIORITY;
     private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+    private boolean useDedicatedTaskRunner;
 
     public PersistenceAdapter createPersistenceAdapter() throws IOException {
         jdbcPersistenceAdapter.setDataSource(getDataSource());
@@ -110,10 +111,18 @@
         this.useJournal = useJournal;
     }
 
+    public boolean isUseDedicatedTaskRunner() {
+        return useDedicatedTaskRunner;
+    }
+    
+    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
+        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
+    }
+    
     public TaskRunnerFactory getTaskRunnerFactory() {
         if (taskRunnerFactory == null) {
             taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
-                                                      true, 1000);
+                                                      true, 1000, isUseDedicatedTaskRunner());
         }
         return taskRunnerFactory;
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
Fri Nov 13 17:04:09 2009
@@ -43,7 +43,7 @@
         this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
     }
     
-    public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun)
{
+    private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun)
{
     	this(name,priority,daemon,maxIterationsPerRun,false);
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
Fri Nov 13 17:04:09 2009
@@ -333,7 +333,9 @@
             if (mcast != null) {
                 mcast.close();
             }
-            runner.interrupt();
+            if (runner != null) {
+                runner.interrupt();
+            }
             getExecutor().shutdownNow();
         }
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Fri Nov 13 17:04:09 2009
@@ -45,7 +45,8 @@
 
     private static final Object DISCONNECT = new Object();
     private static final AtomicLong NEXT_ID = new AtomicLong(0);
-    private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport",
Thread.NORM_PRIORITY, true, 1000);
+    // still possible to configure dedicated task runner through system property but not
programmatically
+    private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport",
Thread.NORM_PRIORITY, true, 1000, false);
     protected VMTransport peer;
     protected TransportListener transportListener;
     protected boolean disposed;

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
Fri Nov 13 17:04:09 2009
@@ -55,7 +55,7 @@
 
 @SuppressWarnings("serial")
 class Trace extends Throwable {
-    public int count;
+    public int count = 1;
     public final int size;
     Trace() {
         super();

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Fri Nov 13 17:04:09 2009
@@ -47,7 +47,7 @@
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -105,7 +105,7 @@
         final DestinationStatistics destinationStatistics = new DestinationStatistics();
         consumerInfo.setExclusive(true);
         final Queue queue = new Queue(brokerService, destination,
-                queueMessageStore, destinationStatistics, null);
+                queueMessageStore, destinationStatistics, brokerService.getTaskRunnerFactory());
 
         // a workaround for this issue
         // queue.setUseCache(false);

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Fri Nov 13 17:04:09 2009
@@ -42,15 +42,17 @@
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 
-import junit.framework.TestCase;
+import junit.framework.Test;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.amq.AMQPersistenceAdapter;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+import org.apache.activemq.util.ThreadTracker;
 import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -59,7 +61,7 @@
  * @version $Revision: 1.5 $
  * A Test case for AMQ-1479
  */
-public class DurableConsumerTest extends TestCase {
+public class DurableConsumerTest extends CombinationTestSupport {
     private static final Log LOG = LogFactory.getLog(DurableConsumerTest.class);
     private static int COUNT = 1024*10;
     private static String CONSUMER_NAME = "DURABLE_TEST";
@@ -73,8 +75,8 @@
     
     private static final String TOPIC_NAME = "failoverTopic";
     private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
-   
-    
+    public boolean useDedicatedTaskRunner = false;
+       
     private class SimpleTopicSubscriber implements MessageListener, ExceptionListener {
 
         private TopicConnection topicConnection = null;
@@ -180,8 +182,7 @@
             final int id = i;
             Thread thread = new Thread( new Runnable() {
                 public void run() {
-                    
-                    SimpleTopicSubscriber sub = new SimpleTopicSubscriber(CONNECTION_URL,
System.currentTimeMillis()+"-"+id, TOPIC_NAME);
+                    new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id,
TOPIC_NAME);
                 }
             } );
             thread.start();
@@ -196,7 +197,13 @@
         Thread.sleep(10000);
         assertEquals(0, exceptions.size());
     }
-  
+    
+    // makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028
+    // with use dedicatedTaskRunner=true and produce OOM
+    public void initCombosForTestConcurrentDurableConsumer() {
+        addCombinationValues("useDedicatedTaskRunner", new Object[] {Boolean.TRUE, Boolean.FALSE});
+    }
+    
     public void testConcurrentDurableConsumer() throws Exception {
     	
     	broker.start();
@@ -251,7 +258,7 @@
             }
         };
         
-        ExecutorService executor = Executors.newCachedThreadPool();
+        ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
 
         for (int i=0; i<numConsumers ; i++) {
             executor.execute(consumer);
@@ -366,8 +373,7 @@
     }
 
     protected void tearDown() throws Exception {
-        super.tearDown();
-        
+        super.tearDown();      
         if (broker != null) {
             broker.stop();
             broker = null;
@@ -396,11 +402,20 @@
         answer.setUseShutdownHook(false);
         answer.setUseJmx(false);
         answer.setAdvisorySupport(false);
+        answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
     }
 
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory(bindAddress);
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
+        factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner);
+        return factory;
     }
 
-   
+    public static Test suite() {
+        return suite(DurableConsumerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
Fri Nov 13 17:04:09 2009
@@ -32,7 +32,6 @@
 public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport {
 
     private static final Log LOG = LogFactory.getLog(DiscoveryTransportNoBrokerTest.class);
-    
 
     public void testNoExtraThreads() throws Exception {
         BrokerService broker = new BrokerService();
@@ -86,7 +85,7 @@
             connection.setClientID("test");
             fail("Did not fail to connect as expected.");
         }
-        catch ( JMSException expected ) { 
+        catch ( JMSException expected ) {
             assertTrue("reason is java.io.IOException, was: " + expected.getCause(), expected.getCause()
instanceof java.io.IOException);
         }
     }
@@ -107,10 +106,10 @@
             Connection connection = factory.createConnection();
             connection.setClientID("test");
             fail("Did not fail to connect as expected.");
-        } catch ( JMSException expected ) { 
+        } catch ( JMSException expected ) {
             assertTrue("reason is java.io.IOException, was: " + expected.getCause(), expected.getCause()
instanceof java.io.IOException);
             long duration = System.currentTimeMillis() - startT;
-            assertTrue("took at least initialReconnectDelay time: " + duration, duration
>= initialReconnectDelay);
+            assertTrue("took at least initialReconnectDelay time: " + duration + " e:" +
expected, duration >= initialReconnectDelay);
         }
     }
 }



Mime
View raw message