activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1303544 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/BrokerService.java thread/TaskRunnerFactory.java transport/nio/SelectorManager.java
Date Wed, 21 Mar 2012 19:55:31 GMT
Author: tabish
Date: Wed Mar 21 19:55:30 2012
New Revision: 1303544

URL: http://svn.apache.org/viewvc?rev=1303544&view=rev
Log:
fis for: https://issues.apache.org/jira/browse/AMQ-3718

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1303544&r1=1303543&r2=1303544&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Wed Mar 21 19:55:30 2012
@@ -31,6 +31,8 @@ import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -2371,13 +2373,35 @@ public class BrokerService implements Se
 
     protected synchronized ThreadPoolExecutor getExecutor() {
         if (this.executor == null) {
-        this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "Usage Async Task");
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
+            this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
+
+                private long i = 0;
+
+                @Override
+                public Thread newThread(Runnable runnable) {
+                    this.i++;
+                    Thread thread = new Thread(runnable, "BrokerService.worker." + this.i);
+                    thread.setDaemon(true);
+                    thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
{
+                        @Override
+                        public void uncaughtException(final Thread t, final Throwable e)
{
+                            LOG.error("Error in thread '{}'", t.getName(), e);
+                        }
+                    });
+                    return thread;
+                }
+            }, new RejectedExecutionHandler() {
+                @Override
+                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor
executor) {
+                    try {
+                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
+                    } catch (InterruptedException e) {
+                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
+                    }
+
+                    throw new RejectedExecutionException("Timed Out while attempting to enqueue
Task.");
+                }
+            });
         }
         return this.executor;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=1303544&r1=1303543&r2=1303544&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
Wed Mar 21 19:55:30 2012
@@ -48,9 +48,9 @@ public class TaskRunnerFactory implement
     public TaskRunnerFactory() {
         this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
     }
-    
+
     private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun)
{
-    	this(name,priority,daemon,maxIterationsPerRun,false);
+        this(name,priority,daemon,maxIterationsPerRun,false);
     }
 
     public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun,
boolean dedicatedTaskRunner) {
@@ -92,7 +92,7 @@ public class TaskRunnerFactory implement
     public void execute(Runnable runnable) {
         execute(runnable, "ActiveMQ Task");
     }
-    
+
     public void execute(Runnable runnable, String name) {
         init();
         if (executor != null) {
@@ -103,7 +103,7 @@ public class TaskRunnerFactory implement
     }
 
     protected ExecutorService createDefaultExecutor() {
-        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
+        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
                 Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
                 thread.setDaemon(daemon);
@@ -111,7 +111,6 @@ public class TaskRunnerFactory implement
                 return thread;
             }
         });
-        // rc.allowCoreThreadTimeOut(true);
         return rc;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?rev=1303544&r1=1303543&r2=1303544&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Wed Mar 21 19:55:30 2012
@@ -29,11 +29,9 @@ import java.util.concurrent.TimeUnit;
 /**
  * The SelectorManager will manage one Selector and the thread that checks the
  * selector.
- * 
+ *
  * We may need to consider running more than one thread to check the selector if
  * servicing the selector takes too long.
- * 
- * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
  */
 public final class SelectorManager {
 
@@ -43,28 +41,31 @@ public final class SelectorManager {
     private Executor channelExecutor = selectorExecutor;
     private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
     private int maxChannelsPerWorker = 1024;
-    
+
     protected ExecutorService createDefaultExecutor() {
-        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
+        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
+
+            private long i = 0;
+
             public Thread newThread(Runnable runnable) {
-                return new Thread(runnable, "ActiveMQ NIO Worker");
+                this.i++;
+                final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i);
+                return t;
             }
         });
-        // rc.allowCoreThreadTimeOut(true);
+
         return rc;
     }
-    
+
     public static SelectorManager getInstance() {
         return SINGLETON;
     }
 
     public interface Listener {
         void onSelect(SelectorSelection selector);
-
         void onError(SelectorSelection selection, Throwable error);
     }
 
-
     public synchronized SelectorSelection register(SocketChannel socketChannel, Listener
listener)
         throws IOException {
 
@@ -78,7 +79,6 @@ public final class SelectorManager {
                     worker.retain();
                     selection = new SelectorSelection(worker, socketChannel, listener);
                 }
-                
             } else {
                 // Worker starts /w retain count of 1
                 SelectorWorker worker = new SelectorWorker(this);
@@ -86,7 +86,7 @@ public final class SelectorManager {
                 selection = new SelectorSelection(worker, socketChannel, listener);
             }
         }
-        
+
         return selection;
     }
 
@@ -125,5 +125,4 @@ public final class SelectorManager {
     public void setSelectorExecutor(Executor selectorExecutor) {
         this.selectorExecutor = selectorExecutor;
     }
-
 }



Mime
View raw message