activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r428339 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/thread/ test/java/org/apache/activemq/usecases/
Date Thu, 03 Aug 2006 11:16:19 GMT
Author: jstrachan
Date: Thu Aug  3 04:16:18 2006
New Revision: 428339

URL: http://svn.apache.org/viewvc?rev=428339&view=rev
Log:
Added a modified version of the patch donated to (LINGO-22) submitted by Jim Beattie which
tests that we leave no threads around after shutting down the JMS client and broker, together
with making the Session Executor / TaskRunnerFactory part of the connection. We could still
make it a singleton if required and just use reference counting to ensure its shutdown properly
after all connections are closed

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=428339&r1=428338&r2=428339&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu Aug  3 04:16:18 2006
@@ -97,7 +97,7 @@
 
 public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection,
StatsCapable, Closeable,  StreamConnection, TransportListener {
 
-    public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("ActiveMQ
Session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
+    private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session
Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
     private final ThreadPoolExecutor asyncConnectionThread;
 
     private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
@@ -572,9 +572,10 @@
 
                     started.set(false);
 
-                    // TODO : ActiveMQConnectionFactory.onConnectionClose() not
-                    // yet implemented.
+                    // TODO if we move the TaskRunnerFactory to the connection factory
+                    // then we may need to call
                     // factory.onConnectionClose(this);
+                    sessionTaskRunner.shutdown();
 
                     closed.set(true);
                     closing.set(false);
@@ -856,6 +857,15 @@
     public void removeTransportListener(TransportListener transportListener) {
         transportListeners.remove(transportListener);
     }
+    
+    public TaskRunnerFactory getSessionTaskRunner() {
+        return sessionTaskRunner;
+    }
+
+    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
+        this.sessionTaskRunner = sessionTaskRunner;
+    }
+
     
     // Implementation methods
     // -------------------------------------------------------------------------

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=428339&r1=428338&r2=428339&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Thu Aug  3 04:16:18 2006
@@ -102,7 +102,7 @@
         if( !messageQueue.isRunning() ) {
             messageQueue.start();
             if( session.isSessionAsyncDispatch() || dispatchedBySessionPool ) {
-                taskRunner = ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this,
"ActiveMQ Session: "+session.getSessionId());
+                taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
"ActiveMQ Session: "+session.getSessionId());
             }
             wakeup();
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=428339&r1=428338&r2=428339&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
Thu Aug  3 04:16:18 2006
@@ -18,6 +18,7 @@
 package org.apache.activemq.thread;
 
 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
 import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
 import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
@@ -37,7 +38,7 @@
  */
 public class TaskRunnerFactory {
 
-    private Executor executor;
+    private ExecutorService executor;
     private int maxIterationsPerRun;
     private String name;
     private int priority;
@@ -61,9 +62,14 @@
         } else {
             executor = createDefaultExecutor();
         }
-    
     }
 
+    public void shutdown() {
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+    }
+    
     public TaskRunner createTaskRunner(Task task, String name) {
         if( executor!=null ) {
             return new PooledTaskRunner(executor, task, maxIterationsPerRun);
@@ -72,8 +78,7 @@
         }
     }
     
-    protected Executor createDefaultExecutor() {
-        
+    protected ExecutorService createDefaultExecutor() {
         ThreadPoolExecutor rc = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS,
new SynchronousQueue(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
                 Thread thread = new Thread(runnable, name);
@@ -84,7 +89,6 @@
         });
         rc.allowCoreThreadTimeOut(true);
         return rc;
-            
     }
 
 }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java?rev=428339&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
Thu Aug  3 04:16:18 2006
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.spring.ConsumerBean;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+/**
+ * 
+ * @version $Revision: $
+ */
+public class StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest extends TestCase {
+
+    public static interface Task {
+        public void execute() throws Exception;
+    }
+
+    public void setUp() throws Exception {
+    }
+
+    public void testStartAndStopClientAndBrokerAndCheckNoThreadsAreLeft() throws Exception
{
+        runTest(new Task() {
+
+            public void execute() throws Exception {
+                BrokerService broker = new BrokerService();
+                broker.setPersistent(false);
+                broker.start();
+
+                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+                Connection connection = factory.createConnection();
+                connection.start();
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Queue destination = session.createQueue(getName());
+
+                // consumer
+                MessageConsumer consumer = session.createConsumer(destination);
+                ConsumerBean listener = new ConsumerBean();
+                consumer.setMessageListener(listener);
+
+                // producer
+                MessageProducer producer = session.createProducer(destination);
+                TextMessage message = session.createTextMessage("Hello World!");
+                producer.send(message);
+                producer.close();
+
+                listener.assertMessagesArrived(1);
+
+                consumer.close();
+                session.close();
+                connection.close();
+
+                broker.stop();
+            }
+        });
+    }
+
+    public void runTest(Task task) throws Exception {
+        int numThreads = Thread.currentThread().getThreadGroup().activeCount();
+        Thread.currentThread().getThreadGroup().list();
+
+        task.execute();
+
+        Thread.yield();
+        Thread.sleep(2000); // Wait for the threads to exit on their own
+
+        Thread.currentThread().getThreadGroup().list();
+        assertEquals(numThreads, Thread.currentThread().getThreadGroup().activeCount());
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message