activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: Change to a lock free ordered executor
Date Fri, 18 Dec 2015 03:17:29 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 90c946970 -> 31748a793


Change to a lock free ordered executor


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/631c2fa7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/631c2fa7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/631c2fa7

Branch: refs/heads/master
Commit: 631c2fa78054d12aa164ae57b9d35c26634a7e21
Parents: 90c9469
Author: Stuart Douglas <stuart.w.douglas@gmail.com>
Authored: Fri Dec 18 09:21:34 2015 +0900
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Dec 17 22:17:08 2015 -0500

----------------------------------------------------------------------
 .../artemis/utils/OrderedExecutorFactory.java   | 110 +++++++++----------
 1 file changed, 52 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/631c2fa7/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
index 7475526..18db9c7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.artemis.utils;
 
+import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
@@ -54,78 +56,70 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
     * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after
another call A to the
     * same method, will result in B's task running after A's.
     */
-   private static final class OrderedExecutor implements Executor {
+   private static class OrderedExecutor implements Executor {
 
-      private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();
+      private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
+      private final Executor delegate;
+      private final ExecutorTask task = new ExecutorTask();
 
-      // @protected by tasks
-      private boolean running;
+      private static final AtomicIntegerFieldUpdater<OrderedExecutor> stateUpdater
= AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, "state");
 
-      private final Executor parent;
+      private static final int STATE_NOT_RUNNING = 0;
+      private static final int STATE_RUNNING = 1;
 
-      private final Runnable runner;
+      public OrderedExecutor(Executor delegate) {
+         this.delegate = delegate;
+      }
+
+
+      @Override
+      public void execute(Runnable command) {
+         tasks.add(command);
+         if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
+            //note that this can result in multiple tasks being queued
+            //this is not an issue as the CAS will mean that the second (and subsequent)
execution is ignored
+            delegate.execute(task);
+         }
+      }
 
-      /**
-       * Construct a new instance.
-       *
-       * @param parent the parent executor
-       */
-      public OrderedExecutor(final Executor parent) {
-         this.parent = parent;
-         runner = new Runnable() {
-            @Override
-            public void run() {
-               for (;;) {
-                  // Optimization, first try without any locks
+      private final class ExecutorTask implements Runnable {
+
+         @Override
+         public void run() {
+            do {
+               //if there is no thread active then we run
+               if (stateUpdater.compareAndSet(OrderedExecutor.this, STATE_NOT_RUNNING, STATE_RUNNING))
{
                   Runnable task = tasks.poll();
-                  if (task == null) {
-                     synchronized (tasks) {
-                        // if it's null we need to retry now holding the lock on tasks
-                        // this is because running=false and tasks.empty must be an atomic
operation
-                        // so we have to retry before setting the tasks to false
-                        // this is a different approach to the anti-pattern on synchronize-retry,
-                        // as this is just guaranteeing the running=false and tasks.empty
being an atomic operation
-                        task = tasks.poll();
-                        if (task == null) {
-                           running = false;
-                           return;
-                        }
+                  //while the queue is not empty we process in order
+                  while (task != null) {
+                     try {
+                        task.run();
                      }
+                     catch (ActiveMQInterruptedException e) {
+                        // This could happen during shutdowns. Nothing to be concerned about
here
+                        ActiveMQClientLogger.LOGGER.debug("Interrupted Thread", e);
+                     }
+                     catch (Throwable t) {
+                        ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);
+                     }
+                     task = tasks.poll();
                   }
-                  try {
-                     task.run();
-                  }
-                  catch (ActiveMQInterruptedException e) {
-                     // This could happen during shutdowns. Nothing to be concerned about
here
-                     ActiveMQClientLogger.LOGGER.debug("Interrupted Thread", e);
-                  }
-                  catch (Throwable t) {
-                     ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);
-                  }
+                  //set state back to not running.
+                  stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING);
                }
-            }
-         };
-      }
-
-      /**
-       * Run a task.
-       *
-       * @param command the task to run.
-       */
-      @Override
-      public void execute(final Runnable command) {
-         synchronized (tasks) {
-            tasks.add(command);
-            if (!running) {
-               running = true;
-               parent.execute(runner);
-            }
+               else {
+                  return;
+               }
+               //we loop again based on tasks not being empty. Otherwise there is a window
where the state is running,
+               //but poll() has returned null, so a submitting thread will believe that it
does not need re-execute.
+               //this check fixes the issue
+            } while (!tasks.isEmpty());
          }
       }
 
       @Override
       public String toString() {
-         return "OrderedExecutor(running=" + running + ", tasks=" + tasks + ")";
+         return "OrderedExecutor(tasks=" + tasks + ")";
       }
    }
 }


Mime
View raw message