activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] michaelandrepearce commented on a change in pull request #2524: ARTEMIS-2240 ActiveMQThreadPoolExecutor should use LinkedTransferQueue
Date Tue, 29 Jan 2019 21:08:14 GMT
michaelandrepearce commented on a change in pull request #2524: ARTEMIS-2240 ActiveMQThreadPoolExecutor
should use LinkedTransferQueue
URL: https://github.com/apache/activemq-artemis/pull/2524#discussion_r252017839
 
 

 ##########
 File path: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
 ##########
 @@ -29,125 +30,58 @@
  * and will be removed after idling for a specified keep time.
  * But in contrast to a standard cached executor, tasks are queued if the
  * maximum pool size if reached, instead of rejected.
- *
- * This is achieved by using a specialized blocking queue, which checks the
- * state of the associated executor in the offer method to decide whether to
- * queue a task or have the executor create another thread.
- *
- * Since the thread pool's execute method is reentrant, more than one caller
- * could try to offer a task into the queue. There is a small chance that
- * (a few) more threads are created as it should be limited by max pool size.
- * To allow for such a case not to reject a task, the underlying thread pool
- * executor is not limited. Only the offer method checks the configured limit.
  */
 public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
 
-   @SuppressWarnings("serial")
-   private static class ThreadPoolQueue extends LinkedBlockingQueue<Runnable> {
+   /**
+    * The default rejected execution handler
+    */
+   private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
 
-      private ActiveMQThreadPoolExecutor executor = null;
+   // Handler executed when a task is submitted and a new thread cannot be created (because
maxSize was reached)
+   // It queues the task on the executors's queue (using the add() method, see ThreadPoolQueue
class below)
+   private static class QueueExecutionHandler implements RejectedExecutionHandler {
 
-      // keep track of the difference between the number of idle threads and
-      // the number of queued tasks. If the delta is > 0, we have more
-      // idle threads than queued tasks and can add more tasks into the queue.
-      // The delta is incremented if a thread becomes idle or if a task is taken from the
queue.
-      // The delta is decremented if a thread leaves idle state or if a task is added to
the queue.
-      private static final AtomicIntegerFieldUpdater<ThreadPoolQueue> DELTA_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(ThreadPoolQueue.class, "threadTaskDelta");
-      private volatile int threadTaskDelta = 0;
+      private final RejectedExecutionHandler handler;
 
-      public void setExecutor(ActiveMQThreadPoolExecutor executor) {
-         this.executor = executor;
+      private QueueExecutionHandler(RejectedExecutionHandler handler) {
+         Objects.requireNonNull(handler);
+         this.handler = handler;
       }
 
       @Override
-      public boolean offer(Runnable runnable) {
-         boolean retval = false;
-
-         if (threadTaskDelta > 0 || (executor.getPoolSize() >= executor.getMaximumPoolSize()))
{
-            // A new task will be added to the queue if the maximum number of threads has
been reached
-            // or if the delta is > 0, which means that there are enough idle threads.
-
-            retval = super.offer(runnable);
-
-            // Only decrement the delta if the task has actually been added to the queue
-            if (retval)
-               DELTA_UPDATER.decrementAndGet(this);
+      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+         if (executor.isShutdown() || !executor.getQueue().add(r)) {
+            handler.rejectedExecution(r, executor);
          }
-
-         return retval;
       }
+   }
 
-      @Override
-      public Runnable take() throws InterruptedException {
-         // Increment the delta as a thread becomes idle
-         // by waiting for a task to take from the queue
-         DELTA_UPDATER.incrementAndGet(this);
-
-
-         Runnable runnable = null;
+   // A specialized LinkedBlockingQueue that takes new elements by calling add() but not
offer()
 
 Review comment:
   @franz1981 
   
   test is run 20 times without restart of the process, and the provided times are us average
for each run of 100,000, was tested on a stock dell r630, with persistance disabled to eliminate
disk io from impacting results.
   
   commit 1)
   
   175
   165
   66
   65
   62
   60
   98
   98
   45
   45
   50
   50
   45
   52
   44
   44
   46
   47
   44
   
   latest version)
   
   165
   161
   69
   70
   52
   51
   104
   47
   45
   45
   53
   53
   51
   44
   42
   42
   43
   44
   45
   
   I think this looks good to merge.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message