activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1451 - Remove synchronization on ActiveMQThreadPoolExecutor
Date Thu, 05 Oct 2017 21:06:29 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 9065e49c4 -> 37135617a


ARTEMIS-1451 - Remove synchronization on ActiveMQThreadPoolExecutor


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f8b758d1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f8b758d1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f8b758d1

Branch: refs/heads/master
Commit: f8b758d14bc4dac7d613e1d15a65d31289f0a587
Parents: 9065e49
Author: barreiro <lbbbarreiro@gmail.com>
Authored: Thu Oct 5 21:21:52 2017 +0100
Committer: barreiro <lbbbarreiro@gmail.com>
Committed: Thu Oct 5 21:30:31 2017 +0100

----------------------------------------------------------------------
 .../utils/ActiveMQThreadPoolExecutor.java       | 147 ++-----------------
 1 file changed, 15 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8b758d1/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
index c3b1988..ed5f4ef 100755
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.utils;
 
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -28,151 +29,33 @@ import java.util.concurrent.TimeUnit;
  * and will be removed after idling for a specified keep time.
  * But in contrast to a standard cached executor, tasks are queued if the
  * maximum pool size if reached, instead of rejected.
- *
- * This is achieved by using a specialized blocking queue, which checks the
- * state of the associated executor in the offer method to decide whether to
- * queue a task or have the executor create another thread.
- *
- * Since the thread pool's execute method is reentrant, more than one caller
- * could try to offer a task into the queue. There is a small chance that
- * (a few) more threads are created as it should be limited by max pool size.
- * To allow for such a case not to reject a task, the underlying thread pool
- * executor is not limited. Only the offer method checks the configured limit.
  */
 public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
 
-   @SuppressWarnings("serial")
-   private static class ThreadPoolQueue extends LinkedBlockingQueue<Runnable> {
-
-      private ActiveMQThreadPoolExecutor executor = null;
-
-      // lock object to synchronize on
-      private final Object lock = new Object();
-
-      // keep track of the difference between the number of idle threads and
-      // the number of queued tasks. If the delta is > 0, we have more
-      // idle threads than queued tasks and can add more tasks into the queue.
-      // The delta is incremented if a thread becomes idle or if a task is taken from the
queue.
-      // The delta is decremented if a thread leaves idle state or if a task is added to
the queue.
-      private int threadTaskDelta = 0;
-
-      public void setExecutor(ActiveMQThreadPoolExecutor executor) {
-         this.executor = executor;
+   // Handler executed when a task is submitted and a new thread cannot be created (because
maxSize was reached)
+   // It queues the task on the executors's queue (using the add() method, see ThreadPoolQueue
class below)
+   private static final RejectedExecutionHandler QUEUE_EXECUTION_HANDLER = (r, e) -> {
+      if (!e.isShutdown()) {
+         e.getQueue().add(r);
       }
+   };
 
-      @Override
-      public boolean offer(Runnable runnable) {
-         boolean retval = false;
-
-         // Need to lock for 2 reasons:
-         // 1. to safely handle poll timeouts
-         // 2. to protect the delta from parallel updates
-         synchronized (lock) {
-            if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || (threadTaskDelta
> 0)) {
-               // A new task will be added to the queue if the maximum number of threads
has been reached
-               // or if the delta is > 0, which means that there are enough idle threads.
-
-               retval = super.offer(runnable);
-
-               // Only decrement the delta if the task has actually been added to the queue
-               if (retval)
-                  threadTaskDelta--;
-            }
-         }
-
-         return retval;
-      }
+   // A specialized LinkedBlockingQueue that takes new elements by calling add() but not
offer()
+   // This is to force the ThreadPoolExecutor to always create new threads and never queue
+   private static class ThreadPoolQueue extends LinkedBlockingQueue<Runnable> {
 
       @Override
-      public Runnable take() throws InterruptedException {
-         // Increment the delta as a thread becomes idle
-         // by waiting for a task to take from the queue
-         synchronized (lock) {
-            threadTaskDelta++;
-         }
-
-         Runnable runnable = null;
-
-         try {
-            runnable = super.take();
-            return runnable;
-         } finally {
-            // Now the thread is no longer idle waiting for a task
-            // If it had taken a task, the delta remains the same
-            // (decremented by the thread and incremented by the taken task)
-            // Only if no task had been taken, we have to decrement the delta.
-            if (runnable == null) {
-               synchronized (lock) {
-                  threadTaskDelta--;
-               }
-            }
-         }
+      public boolean offer(Runnable runnable) {
+         return false;
       }
 
       @Override
-      public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException {
-         // Increment the delta as a thread becomes idle
-         // by waiting for a task to poll from the queue
-         synchronized (lock) {
-            threadTaskDelta++;
-         }
-
-         Runnable runnable = null;
-         boolean timedOut = false;
-
-         try {
-            runnable = super.poll(arg0, arg2);
-            timedOut = (runnable == null);
-         } finally {
-            // Now the thread is no longer idle waiting for a task
-            // If it had taken a task, the delta remains the same
-            // (decremented by the thread and incremented by the taken task)
-            if (runnable == null) {
-               synchronized (lock) {
-                  // If the poll called timed out, we check again within a synchronized block
-                  // to make sure all offer calls have been completed.
-                  // This is to handle a newly queued task if the timeout occurred while
an offer call
-                  // added that task to the queue instead of creating a new thread.
-                  if (timedOut)
-                     runnable = super.poll();
-
-                  // Only if no task had been taken (either no timeout, or no task from after-timeout
poll),
-                  // we have to decrement the delta.
-                  if (runnable == null)
-                     threadTaskDelta--;
-               }
-            }
-         }
-
-         return runnable;
+      public boolean add(Runnable runnable) {
+         return super.offer( runnable );
       }
    }
 
-   private int maxPoolSize;
-
    public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits,
ThreadFactory factory) {
-      this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory);
-   }
-
-   // private constructor is needed to inject 'this' into the ThreadPoolQueue instance
-   private ActiveMQThreadPoolExecutor(int coreSize,
-                                      int maxSize,
-                                      long keep,
-                                      TimeUnit keepUnits,
-                                      ThreadPoolQueue myQueue,
-                                      ThreadFactory factory) {
-      super(coreSize, Integer.MAX_VALUE, keep, keepUnits, myQueue, factory);
-      maxPoolSize = maxSize;
-      myQueue.setExecutor(this);
-   }
-
-   @Override
-   public int getMaximumPoolSize() {
-      return maxPoolSize;
-   }
-
-   @Override
-   public void setMaximumPoolSize(int maxSize) {
-      maxPoolSize = maxSize;
+      super( coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory, QUEUE_EXECUTION_HANDLER
);
    }
 }


Mime
View raw message