activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r916780 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/TransportConnection.java broker/TransportConnector.java thread/DefaultThreadPools.java thread/TaskRunnerFactory.java transport/nio/SelectorManager.java
Date Fri, 26 Feb 2010 18:40:31 GMT
Author: chirino
Date: Fri Feb 26 18:40:31 2010
New Revision: 916780

URL: http://svn.apache.org/viewvc?rev=916780&view=rev
Log:
Updated the TransportConnector and TransportConnection to use a thread pool when initalizing
and destroying connection to better support fast
connect and disconnect use cases.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=916780&r1=916779&r2=916780&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Feb 26 18:40:31 2010
@@ -77,6 +77,7 @@
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
 import org.apache.activemq.state.TransactionState;
+import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -91,6 +92,8 @@
 import org.apache.activemq.util.URISupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
+import static org.apache.activemq.thread.DefaultThreadPools.*;
 /**
  * @version $Revision: 1.8 $
  */
@@ -908,8 +911,7 @@
                 cs.getContext().getStopping().set(true);
             }
             try {
-                new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress())
{
-                    @Override
+                getDefaultTaskRunnerFactory().execute(new Runnable(){
                     public void run() {
                         serviceLock.writeLock().lock();
                         try {
@@ -922,7 +924,7 @@
                             serviceLock.writeLock().unlock();
                         }
                     }
-                }.start();
+                });
             } catch (Throwable t) {
                 LOG.warn("cannot create async transport stopper thread.. not waiting for
stop to complete, reason:", t);
                 stopped.countDown();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=916780&r1=916779&r2=916780&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Fri Feb 26 18:40:31 2010
@@ -21,6 +21,7 @@
 import org.apache.activemq.broker.region.ConnectorStatistics;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
+import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
@@ -32,6 +33,9 @@
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
+import static org.apache.activemq.thread.DefaultThreadPools.*;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -202,9 +206,7 @@
         server.setAcceptListener(new TransportAcceptListener() {
             public void onAccept(final Transport transport) {
                 try {
-                    // Starting the connection could block due to
-                    // wireformat negotiation, so start it in an async thread.
-                    Thread startThread = new Thread("ActiveMQ Transport Initiator: " + transport.getRemoteAddress())
{
+                    getDefaultTaskRunnerFactory().execute(new Runnable(){
                         public void run() {
                             try {
                                 Connection connection = createConnection(transport);
@@ -214,8 +216,7 @@
                                 onAcceptError(e);
                             }
                         }
-                    };
-                    startThread.start();
+                    });
                 } catch (Exception e) {
                     String remoteHost = transport.getRemoteAddress();
                     ServiceSupport.dispose(transport);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java?rev=916780&r1=916779&r2=916780&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
Fri Feb 26 18:40:31 2010
@@ -26,24 +26,24 @@
  */
 public final class DefaultThreadPools {
 
-    private static final Executor DEFAULT_POOL;
-    static {
-        DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
-    }    
+//    private static final Executor DEFAULT_POOL;
+//    static {
+//        DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
+//            public Thread newThread(Runnable runnable) {
+//                Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
+//                thread.setDaemon(true);
+//                return thread;
+//            }
+//        });
+//    }    
     private static final TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory();
     
     private DefaultThreadPools() {        
     }
     
-    public static Executor getDefaultPool() {
-        return DEFAULT_POOL;
-    }
+//    public static Executor getDefaultPool() {
+//        return DEFAULT_POOL;
+//    }
     
     public static TaskRunnerFactory getDefaultTaskRunnerFactory() {
         return DEFAULT_TASK_RUNNER_FACTORY;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=916780&r1=916779&r2=916780&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
Fri Feb 26 18:40:31 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.thread;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
@@ -31,7 +32,7 @@
  * 
  * @version $Revision: 1.5 $
  */
-public class TaskRunnerFactory {
+public class TaskRunnerFactory implements Executor {
 
     private ExecutorService executor;
     private int maxIterationsPerRun;
@@ -80,6 +81,18 @@
         }
     }
 
+    public void execute(Runnable runnable) {
+        execute(runnable, "ActiveMQ Task");
+    }
+    
+    public void execute(Runnable runnable, String name) {
+        if (executor != null) {
+            executor.execute(runnable);
+        } else {
+            new Thread(runnable, name).start();
+        }
+    }
+
     protected ExecutorService createDefaultExecutor() {
         ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?rev=916780&r1=916779&r2=916780&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Fri Feb 26 18:40:31 2010
@@ -20,8 +20,11 @@
 import java.nio.channels.SocketChannel;
 import java.util.LinkedList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The SelectorManager will manage one Selector and the thread that checks the
@@ -36,17 +39,21 @@
 
     public static final SelectorManager SINGLETON = new SelectorManager();
 
-    private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory()
{
-        public Thread newThread(Runnable r) {
-            Thread rc = new Thread(r);
-            rc.setName("NIO Transport Thread");
-            return rc;
-        }
-    });
+    private Executor selectorExecutor = createDefaultExecutor();
     private Executor channelExecutor = selectorExecutor;
     private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
     private int maxChannelsPerWorker = 64;
     
+    protected ExecutorService createDefaultExecutor() {
+        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                return new Thread(runnable, "ActiveMQ NIO Worker");
+            }
+        });
+        // rc.allowCoreThreadTimeOut(true);
+        return rc;
+    }
+    
     public static SelectorManager getInstance() {
         return SINGLETON;
     }



Mime
View raw message