activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmacn...@apache.org
Subject svn commit: r891451 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple: DispatcherThread.java SimpleDispatcher.java
Date Wed, 16 Dec 2009 22:09:53 GMT
Author: cmacnaug
Date: Wed Dec 16 22:09:53 2009
New Revision: 891451

URL: http://svn.apache.org/viewvc?rev=891451&view=rev
Log:
Fixing a synchronization hole that can result in missed dispatcher wakeups. 

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=891451&r1=891450&r2=891451&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
Wed Dec 16 22:09:53 2009
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.dispatch.internal.simple;
 
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -24,103 +26,103 @@
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 final public class DispatcherThread extends Thread {
-    
+
     private static final int MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL = 1000;
     private final SimpleDispatcher dispatcher;
     final ThreadDispatchQueue[] threadQueues;
     final AtomicLong threadQueuedRunnables = new AtomicLong();
     final IntegerCounter executionCounter = new IntegerCounter();
     ThreadDispatchQueue currentThreadQueue;
-        
+
     public DispatcherThread(SimpleDispatcher dispatcher, int ordinal) {
         this.dispatcher = dispatcher;
         this.threadQueues = new ThreadDispatchQueue[dispatcher.globalQueues.length];
         for (int i = 0; i < threadQueues.length; i++) {
             threadQueues[i] = new ThreadDispatchQueue(this, dispatcher.globalQueues[i]);
         }
-        setName(dispatcher.getLabel()+" dispatcher: "+(ordinal+1));
+        setName(dispatcher.getLabel() + " dispatcher: " + (ordinal + 1));
         setDaemon(true);
     }
-    
+
     @Override
     public void run() {
         GlobalDispatchQueue[] globalQueues = dispatcher.globalQueues;
         final int PRIORITIES = threadQueues.length;
         int processGlobalQueueCount = PRIORITIES;
-        
+
         try {
-            start: for(;;) {
-                
+            start: for (;;) {
+
                 executionCounter.set(MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL);
-                
+
                 // Process the local non-synchronized queues.
                 // least contention
-                outer: while( executionCounter.get() > 0 ) {
-                    processGlobalQueueCount=PRIORITIES;
-                    for (int i=0; i < PRIORITIES; i++) {
+                outer: while (executionCounter.get() > 0) {
+                    processGlobalQueueCount = PRIORITIES;
+                    for (int i = 0; i < PRIORITIES; i++) {
                         currentThreadQueue = threadQueues[i];
                         Runnable runnable = currentThreadQueue.pollLocal();
-                        if( runnable==null ) {
+                        if (runnable == null) {
                             continue;
                         }
-                        
+
                         SimpleDispatcher.CURRENT_QUEUE.set(currentThreadQueue.globalQueue);
-                        processGlobalQueueCount=i;
-                        for(;;) {
+                        processGlobalQueueCount = i;
+                        for (;;) {
                             dispatch(runnable);
-                            if( executionCounter.decrementAndGet() <= 0 ) {
+                            if (executionCounter.decrementAndGet() <= 0) {
                                 break outer;
                             }
                             runnable = currentThreadQueue.pollLocal();
-                            if( runnable == null ) {
+                            if (runnable == null) {
                                 break;
                             }
                         }
                     }
-                    
+
                     // There was no work to do in the local queues..
-                    if( processGlobalQueueCount == PRIORITIES) {
+                    if (processGlobalQueueCount == PRIORITIES) {
                         break;
                     }
                 }
-                
-                // Process the local synchronized queues. 
+
+                // Process the local synchronized queues.
                 // medium contention
-                outer: while( executionCounter.get() > 0 ) {
-                    processGlobalQueueCount=PRIORITIES;
-                    for (int i=0; i < PRIORITIES; i++) {
+                outer: while (executionCounter.get() > 0) {
+                    processGlobalQueueCount = PRIORITIES;
+                    for (int i = 0; i < PRIORITIES; i++) {
                         currentThreadQueue = threadQueues[i];
                         Runnable runnable = currentThreadQueue.pollShared();
-                        if( runnable==null ) {
+                        if (runnable == null) {
                             continue;
                         }
                         SimpleDispatcher.CURRENT_QUEUE.set(currentThreadQueue.globalQueue);
-                        processGlobalQueueCount=i;
-                        for(;;) {
+                        processGlobalQueueCount = i;
+                        for (;;) {
                             dispatch(runnable);
-                            if( executionCounter.decrementAndGet() <= 0 ) {
+                            if (executionCounter.decrementAndGet() <= 0) {
                                 break outer;
                             }
                             runnable = currentThreadQueue.pollShared();
-                            if( runnable == null ) {
+                            if (runnable == null) {
                                 break;
                             }
                         }
                     }
-                    
+
                     // There was no work to do in the local queues..
-                    if( processGlobalQueueCount == PRIORITIES) {
+                    if (processGlobalQueueCount == PRIORITIES) {
                         break;
                     }
                 }
-                
-                // Process the global synchronized queues. 
+
+                // Process the global synchronized queues.
                 // most contention
-                for (int i=0; i < processGlobalQueueCount; i++) {
+                for (int i = 0; i < processGlobalQueueCount; i++) {
                     currentThreadQueue = threadQueues[i];
                     GlobalDispatchQueue queue = globalQueues[i];
                     Runnable runnable = queue.poll();
-                    if( runnable==null ) {
+                    if (runnable == null) {
                         continue;
                     }
                     // We only execute 1 global runnable at a time,
@@ -129,11 +131,11 @@
                     dispatch(runnable);
                     continue start;
                 }
-                
-                if( executionCounter.get()!=MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL ) {
+
+                if (executionCounter.get() != MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL)
{
                     continue start;
                 }
-                
+
                 // If we get here then there was no work in the global queues..
                 try {
                     waitForWakeup();
@@ -141,11 +143,11 @@
                     e.printStackTrace();
                     return;
                 }
-            }            
+            }
         } catch (Shutdown e) {
         }
     }
-    
+
     @SuppressWarnings("serial")
     static class Shutdown extends RuntimeException {
     }
@@ -159,41 +161,39 @@
             e.printStackTrace();
         }
     }
-    
+
     public static DispatcherThread currentDispatcherThread() {
         Thread currentThread = Thread.currentThread();
-        if( currentThread.getClass() == DispatcherThread.class ) {
+        if (currentThread.getClass() == DispatcherThread.class) {
             return (DispatcherThread) currentThread;
         }
         return null;
     }
 
-    private final Object wakeupMutex = new Object();
-    private boolean inWaitingList;
-    
+    private final Semaphore wakeups = new Semaphore(0);
+    private final AtomicBoolean inWaitingList = new AtomicBoolean(false);
+
     private void waitForWakeup() throws InterruptedException {
-        while( threadQueuedRunnables.get()==0 && dispatcher.globalQueuedRunnables.get()==0
) {
-            synchronized(wakeupMutex) {
-                if( !inWaitingList ) {
-                    dispatcher.addWaitingDispatcher(this);
-                    inWaitingList=true;
+        while (threadQueuedRunnables.get() == 0 && dispatcher.globalQueuedRunnables.get()
== 0) {
+            if (!wakeups.tryAcquire()) {
+                if (inWaitingList.compareAndSet(false, true)) {
+                    if (!dispatcher.addWaitingDispatcher(this)) {
+                        inWaitingList.set(false);
+                    }
                 }
-                wakeupMutex.wait();
             }
+            wakeups.acquire();
         }
+        wakeups.drainPermits();
     }
 
     public void globalWakeup() {
-        synchronized(wakeupMutex) {
-            inWaitingList=false;
-            wakeupMutex.notify();
-        }
+        wakeups.release();
+        inWaitingList.set(false);
     }
-    
+
     public void wakeup() {
-        synchronized(wakeupMutex) {
-            wakeupMutex.notify();
-        }
+        wakeups.release();
     }
-   
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=891451&r1=891450&r2=891451&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
Wed Dec 16 22:09:53 2009
@@ -32,32 +32,30 @@
 
 import static org.apache.activemq.dispatch.DispatchPriority.*;
 
-
-
 /**
  * Implements a simple dispatch system.
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 final public class SimpleDispatcher extends BaseRetained implements Dispatcher {
-        
+
     public final static ThreadLocal<SimpleQueue> CURRENT_QUEUE = new ThreadLocal<SimpleQueue>();
 
     final SerialDispatchQueue mainQueue = new SerialDispatchQueue(this, "main");
-    final GlobalDispatchQueue globalQueues[]; 
+    final GlobalDispatchQueue globalQueues[];
     final DispatcherThread dispatchers[];
     final AtomicLong globalQueuedRunnables = new AtomicLong();
-    
+
     final ConcurrentLinkedQueue<DispatcherThread> waitingDispatchers = new ConcurrentLinkedQueue<DispatcherThread>();
     final AtomicInteger waitingDispatcherCount = new AtomicInteger();
     private final String label;
     TimerThread timerThread;
-    
+
     public SimpleDispatcher(DispatcherConfig config) {
         this.label = config.getLabel();
         globalQueues = new GlobalDispatchQueue[3];
         for (int i = 0; i < 3; i++) {
-            globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i]
);
+            globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i]);
         }
         dispatchers = new DispatcherThread[config.getThreads()];
     }
@@ -65,7 +63,7 @@
     public DispatchQueue getMainQueue() {
         return mainQueue;
     }
-    
+
     public DispatchQueue getGlobalQueue() {
         return getGlobalQueue(DEFAULT);
     }
@@ -73,13 +71,13 @@
     public DispatchQueue getGlobalQueue(DispatchPriority priority) {
         return globalQueues[priority.ordinal()];
     }
-    
+
     public DispatchQueue createSerialQueue(String label, DispatchOption... options) {
         AbstractSerialDispatchQueue rc = new SerialDispatchQueue(this, label, options);
         rc.setTargetQueue(getGlobalQueue());
         return rc;
     }
-    
+
     public void dispatchMain() {
         mainQueue.run();
     }
@@ -88,16 +86,22 @@
         return null;
     }
 
-    public void addWaitingDispatcher(DispatcherThread dispatcher) {
-        waitingDispatcherCount.incrementAndGet();
-        waitingDispatchers.add(dispatcher);
+    public boolean addWaitingDispatcher(DispatcherThread dispatcher) {
+        if (globalQueuedRunnables.get() <= 0) {
+            waitingDispatcherCount.incrementAndGet();
+            waitingDispatchers.add(dispatcher);
+            return true;
+        } else {
+            dispatcher.globalWakeup();
+            return false;
+        }
     }
-    
+
     public void wakeup() {
         int value = waitingDispatcherCount.get();
-        if( value!=0 ) {
+        if (value != 0) {
             DispatcherThread dispatcher = waitingDispatchers.poll();
-            if( dispatcher!=null ) {
+            if (dispatcher != null) {
                 waitingDispatcherCount.decrementAndGet();
                 dispatcher.globalWakeup();
             }
@@ -114,11 +118,12 @@
     }
 
     public void shutdown() {
-        
+
         Runnable countDown = new Runnable() {
             AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length);
+
             public void run() {
-                if( shutdownCountDown.decrementAndGet()==0 ) {
+                if (shutdownCountDown.decrementAndGet() == 0) {
                     // Notify any registered shutdown watchers.
                     SimpleDispatcher.super.shutdown();
                 }
@@ -140,10 +145,10 @@
     public DispatchQueue getCurrentQueue() {
         return CURRENT_QUEUE.get();
     }
-    
+
     public DispatchQueue getCurrentThreadQueue() {
         DispatcherThread thread = DispatcherThread.currentDispatcherThread();
-        if( thread == null ) {
+        if (thread == null) {
             return null;
         }
         return thread.currentThreadQueue;



Mime
View raw message