activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r890887 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src: main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/dispatch/internal/ main/java/org/apache/activemq/dispatch/internal/advanced/ main/java/org/a...
Date Tue, 15 Dec 2009 17:03:20 GMT
Author: chirino
Date: Tue Dec 15 17:03:19 2009
New Revision: 890887

URL: http://svn.apache.org/viewvc?rev=890887&view=rev
Log:
Optimizing the the simple queue impl

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
Tue Dec 15 17:03:19 2009
@@ -31,7 +31,7 @@
     public void dispatchMain();
     
     public DispatchQueue getCurrentQueue();
-
+    
     public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue
queue);
 
     

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
Tue Dec 15 17:03:19 2009
@@ -34,11 +34,13 @@
  */
 abstract public class AbstractSerialDispatchQueue extends AbstractDispatchObject implements
DispatchQueue, Runnable {
 
-    private final String label;
-    private final AtomicInteger suspendCounter = new AtomicInteger();
-    private final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
-    private final AtomicLong size = new AtomicLong();
-    private final Set<DispatchOption> options;
+    protected final String label;
+    protected final AtomicInteger suspendCounter = new AtomicInteger();
+    
+    protected final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+    
+    protected final AtomicLong size = new AtomicLong();
+    protected final Set<DispatchOption> options;
 
     public AbstractSerialDispatchQueue(String label, DispatchOption...options) {
         this.label = label;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
Tue Dec 15 17:03:19 2009
@@ -199,6 +199,14 @@
 
     public DispatchQueue getCurrentQueue() {
         return CURRENT_QUEUE.get();
+    }
+
+    public DispatchQueue getCurrentThreadQueue() {
+        DispatcherThread thread = DispatcherThread.CURRENT.get();
+        if( thread==null ) {
+            return null;
+        }
+        return thread.currentDispatchQueue;
     }    
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
Tue Dec 15 17:03:19 2009
@@ -33,7 +33,7 @@
 
     static public final ThreadLocal<DispatcherThread> CURRENT = new ThreadLocal<DispatcherThread>();
 
-    private final ThreadDispatchQueue dispatchQueues[];
+    final ThreadDispatchQueue dispatchQueues[];
     
     static final boolean DEBUG = false;
     private Thread thread;
@@ -50,6 +50,8 @@
 
     // Dispatch queue for requests from other threads:
     final LinkedNodeList<ForeignEvent>[] foreignQueue = createForeignQueue();
+    
+    ThreadDispatchQueue currentDispatchQueue;
 
     private static final int[] TOGGLE = new int[] { 1, 0 };
     int foreignToggle = 0;
@@ -168,7 +170,8 @@
                 // If no local work available wait for foreign work:
                 while((pdc = priorityQueue.poll())!=null){
                     if( pdc.priority < dispatchQueues.length ) {
-                        AdvancedDispatcher.CURRENT_QUEUE.set(dispatchQueues[pdc.priority]);
+                        currentDispatchQueue = dispatchQueues[pdc.priority];
+                        AdvancedDispatcher.CURRENT_QUEUE.set(currentDispatchQueue);
                     }
                     
                     if (pdc.tracker != null) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
Tue Dec 15 17:03:19 2009
@@ -91,7 +91,7 @@
     }
 
     public DispatchQueue getTargetQueue() {
-        throw new UnsupportedOperationException();
+        return null;
     }
 
     public void release() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
Tue Dec 15 17:03:19 2009
@@ -24,10 +24,13 @@
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 final public class DispatcherThread extends Thread {
-    private static final int MAX_DISPATCH_BEFORE_CHECKING_FOR_HIGHER_PRIO = 10000;
+    
+    private static final int MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL = 1000;
     private final SimpleDispatcher dispatcher;
     final ThreadDispatchQueue[] threadQueues;
     final AtomicLong threadQueuedRunnables = new AtomicLong();
+    final IntegerCounter executionCounter = new IntegerCounter();
+    ThreadDispatchQueue currentThreadQueue;
         
     public DispatcherThread(SimpleDispatcher dispatcher, int ordinal) {
         this.dispatcher = dispatcher;
@@ -42,63 +45,103 @@
     @Override
     public void run() {
         GlobalDispatchQueue[] globalQueues = dispatcher.globalQueues;
+        final int PRIORITIES = threadQueues.length;
+        int processGlobalQueueCount = PRIORITIES;
+        
         try {
-            outer: while( true ) {
-                int counter=0;
-                for (ThreadDispatchQueue queue : threadQueues) {
-                    SimpleDispatcher.CURRENT_QUEUE.set(queue.globalQueue);
-                    Runnable runnable;
-                    while( (runnable = queue.poll())!=null ) {
-                        dispatch(runnable);
-                        counter++;
+            start: for(;;) {
+                
+                executionCounter.set(MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL);
+                
+                // Process the local non-synchronized queues.
+                // least contention
+                outer: while( executionCounter.get() > 0 ) {
+                    processGlobalQueueCount=PRIORITIES;
+                    for (int i=0; i < PRIORITIES; i++) {
+                        currentThreadQueue = threadQueues[i];
+                        Runnable runnable = currentThreadQueue.pollLocal();
+                        if( runnable==null ) {
+                            continue;
+                        }
+                        
+                        SimpleDispatcher.CURRENT_QUEUE.set(currentThreadQueue.globalQueue);
+                        processGlobalQueueCount=i;
+                        for(;;) {
+                            dispatch(runnable);
+                            if( executionCounter.decrementAndGet() <= 0 ) {
+                                break outer;
+                            }
+                            runnable = currentThreadQueue.pollLocal();
+                            if( runnable == null ) {
+                                break;
+                            }
+                        }
+                    }
+                    
+                    // There was no work to do in the local queues..
+                    if( processGlobalQueueCount == PRIORITIES) {
+                        break;
                     }
-                }
-                if( counter!=0 ) {
-                    // don't service the global queues until the thread queues are 
-                    // drained.
-                    continue;
                 }
                 
-                for (SimpleQueue queue : globalQueues) {
-                    SimpleDispatcher.CURRENT_QUEUE.set(queue);
-                    
-                    Runnable runnable;
-                    while( (runnable = queue.poll())!=null ) {
-                        dispatch(runnable);
-                        counter++;
-                        
-                        // Thread queues have the priority.
-                        if( threadQueuedRunnables.get()!=0 ) {
-                            continue outer;
+                // Process the local synchronized queues. 
+                // medium contention
+                outer: while( executionCounter.get() > 0 ) {
+                    processGlobalQueueCount=PRIORITIES;
+                    for (int i=0; i < PRIORITIES; i++) {
+                        currentThreadQueue = threadQueues[i];
+                        Runnable runnable = currentThreadQueue.pollShared();
+                        if( runnable==null ) {
+                            continue;
+                        }
+                        SimpleDispatcher.CURRENT_QUEUE.set(currentThreadQueue.globalQueue);
+                        processGlobalQueueCount=i;
+                        for(;;) {
+                            dispatch(runnable);
+                            if( executionCounter.decrementAndGet() <= 0 ) {
+                                break outer;
+                            }
+                            runnable = currentThreadQueue.pollShared();
+                            if( runnable == null ) {
+                                break;
+                            }
                         }
                     }
+                    
+                    // There was no work to do in the local queues..
+                    if( processGlobalQueueCount == PRIORITIES) {
+                        break;
+                    }
+                }
+                
+                // Process the global synchronized queues. 
+                // most contention
+                for (int i=0; i < processGlobalQueueCount; i++) {
+                    currentThreadQueue = threadQueues[i];
+                    GlobalDispatchQueue queue = globalQueues[i];
+                    Runnable runnable = queue.poll();
+                    if( runnable==null ) {
+                        continue;
+                    }
+                    // We only execute 1 global runnable at a time,
+                    // hoping it generates local work for us.
+                    SimpleDispatcher.CURRENT_QUEUE.set(queue);
+                    dispatch(runnable);
+                    continue start;
+                }
+                
+                if( executionCounter.get()!=MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL ) {
+                    continue start;
                 }
-                if( counter!=0 ) {
-                    // don't wait for wake up until we could find 
-                    // no runnables to dispatch.
-                    continue;
-                }
-            
-//        GlobalDispatchQueue[] globalQueues = dispatcher.globalQueues;
-//        while( true ) {
-//
-//            if( dispatch(threadQueues[0]) 
-//                || dispatch(globalQueues[0]) 
-//                || dispatch(threadQueues[1]) 
-//                || dispatch(globalQueues[1]) 
-//                || dispatch(threadQueues[2]) 
-//                || dispatch(globalQueues[2]) 
-//                ) {
-//                continue;
-//            }
-//        
+                
+                // If we get here then there was no work in the global queues..
                 try {
                     waitForWakeup();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                     return;
                 }
-            }
+            }            
         } catch (Shutdown e) {
         }
     }
@@ -107,23 +150,6 @@
     static class Shutdown extends RuntimeException {
     }
 
-    private boolean dispatch(SimpleQueue queue) {
-        int counter=0;
-        Runnable runnable;
-        while( counter < MAX_DISPATCH_BEFORE_CHECKING_FOR_HIGHER_PRIO ) {
-            runnable = queue.poll();
-            if( runnable == null ) {
-                break;
-            }        
-            if( counter==0 ) {
-                SimpleDispatcher.CURRENT_QUEUE.set(queue);
-            }
-            dispatch(runnable);
-            counter++;
-        }
-        return counter!=0;
-    }
-
     private void dispatch(Runnable runnable) {
         try {
             runnable.run();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
Tue Dec 15 17:03:19 2009
@@ -17,15 +17,14 @@
 package org.apache.activemq.dispatch.internal.simple;
 
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.DispatchOption;
-import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.internal.QueueSupport;
 
 /**
@@ -56,9 +55,14 @@
     }
 
     public void dispatchAsync(Runnable runnable) {
-        this.counter.incrementAndGet();
-        runnables.add(runnable);
-        dispatcher.wakeup();
+        DispatcherThread thread = DispatcherThread.currentDispatcherThread();
+        if( thread==null ) {
+            this.counter.incrementAndGet();
+            runnables.add(runnable);
+            dispatcher.wakeup();
+        } else {
+            thread.currentThreadQueue.dispatchAsync(runnable);
+        }
     }
 
     public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java?rev=890887&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
(added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
Tue Dec 15 17:03:19 2009
@@ -0,0 +1,81 @@
+package org.apache.activemq.dispatch.internal.simple;
+
+
+public class IntegerCounter {
+    
+    int counter;
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        IntegerCounter other = (IntegerCounter) obj;
+        if (counter != other.counter)
+            return false;
+        return true;
+    }
+    
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + counter;
+        return result;
+    }
+
+    public final int addAndGet(int delta) {
+        counter+=delta;
+        return counter;
+    }
+
+    public final int decrementAndGet() {
+        return --counter;
+    }
+
+    public final int get() {
+        return counter;
+    }
+
+    public final int getAndAdd(int delta) {
+        int rc = counter;
+        counter += delta;
+        return rc;
+    }
+
+    public final int getAndDecrement() {
+        int rc = counter;
+        counter --;
+        return rc;
+    }
+
+    public final int getAndIncrement() {
+        return counter++;
+    }
+
+    public final int getAndSet(int newValue) {
+        int rc = counter;
+        counter = newValue;
+        return rc;
+    }
+
+    public final int incrementAndGet() {
+        return ++counter;
+    }
+
+    public int intValue() {
+        return counter;
+    }
+
+    public final void set(int newValue) {
+        counter = newValue;
+    }
+
+    public String toString() {
+        return Integer.toString(counter);
+    }
+    
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
Tue Dec 15 17:03:19 2009
@@ -17,6 +17,7 @@
 
 package org.apache.activemq.dispatch.internal.simple;
 
+import java.util.LinkedList;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.dispatch.DispatchOption;
@@ -29,10 +30,15 @@
     private final SimpleDispatcher dispatcher;
     private volatile boolean stickToThreadOnNextDispatch; 
     private volatile boolean stickToThreadOnNextDispatchRequest; 
+    private final LinkedList<Runnable> localEnqueues = new LinkedList<Runnable>();
+    private final ThreadLocal<Boolean> executing = new ThreadLocal<Boolean>();
     
     SerialDispatchQueue(SimpleDispatcher dispatcher, String label, DispatchOption...options)
{
         super(label, options);
         this.dispatcher = dispatcher;
+        if( getOptions().contains(DispatchOption.STICK_TO_DISPATCH_THREAD) ) {
+            stickToThreadOnNextDispatch=true;
+        }
     }
 
     @Override
@@ -45,12 +51,8 @@
     }
     
     @Override
-    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
-        dispatcher.timerThread.addRelative(runnable, this, delay, unit);
-    }
-
-    @Override
-    protected void dispatchSelfAsync() {
+    public void dispatchAsync(Runnable runnable) {
+        
         if( stickToThreadOnNextDispatchRequest ) {
             SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
             if( current!=null ) {
@@ -65,10 +67,15 @@
                 stickToThreadOnNextDispatchRequest=false;
             }
         }
-        super.dispatchSelfAsync();
+
+        // We can take a shortcut...
+        if( executing.get()!=null ) {
+            localEnqueues.add(runnable);
+        } else {
+            super.dispatchAsync(runnable);
+        }
     }
     
-    @Override
     public void run() {
         SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
         if( stickToThreadOnNextDispatch ) {
@@ -78,14 +85,62 @@
                 setTargetQueue(global.getTargetQueue());
             }
         }
+        
+        DispatcherThread thread = DispatcherThread.currentDispatcherThread();
+        
         SimpleDispatcher.CURRENT_QUEUE.set(this);
+        executing.set(true);
         try {
-            super.run();
+            
+            Runnable runnable;
+            long lsize = size.get();
+            while( suspendCounter.get() <= 0 && lsize > 0 ) {
+                
+                runnable = runnables.poll();
+                if( runnable!=null ) {
+                    runnable.run();
+                    lsize = size.decrementAndGet();
+                    if( lsize==0 ) {
+                        release();
+                    }
+                    if( thread.executionCounter.decrementAndGet() <= 0 ) {
+                        return;
+                    }
+                }
+            }
+            
+            while( (runnable = localEnqueues.poll())!=null ) {
+                runnable.run();
+                if( thread.executionCounter.decrementAndGet() <= 0 ) {
+                    return;
+                }
+            }
+            
         } finally {
+            executing.remove();
+            
+            if( !localEnqueues.isEmpty() ) {
+                
+                long lastSize = size.getAndAdd(localEnqueues.size());
+                if( lastSize==0 ) {
+                    retain();
+                }
+                runnables.addAll(localEnqueues);
+                localEnqueues.clear();
+                
+                if( suspendCounter.get()<=0 ) {
+                    dispatchSelfAsync();
+                }
+            }
             SimpleDispatcher.CURRENT_QUEUE.set(current);
         }
     }
 
+    @Override
+    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+        dispatcher.timerThread.addRelative(runnable, this, delay, unit);
+    }
+
     public DispatchPriority getPriority() {
         throw new UnsupportedOperationException();
     }
@@ -109,4 +164,5 @@
     public SimpleQueue getTargetQueue() {
         return (SimpleQueue) targetQueue;
     }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
Tue Dec 15 17:03:19 2009
@@ -116,7 +116,7 @@
     public void shutdown() {
         
         Runnable countDown = new Runnable() {
-            AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length+1);
+            AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length);
             public void run() {
                 if( shutdownCountDown.decrementAndGet()==0 ) {
                     // Notify any registered shutdown watchers.
@@ -126,10 +126,10 @@
             }
         };
 
-        timerThread.shutdown(countDown);
+        timerThread.shutdown(null);
         for (int i = 0; i < dispatchers.length; i++) {
             ThreadDispatchQueue queue = dispatchers[i].threadQueues[LOW.ordinal()];
-            queue.runnables.add(countDown);
+            queue.dispatchAsync(countDown);
         }
     }
 
@@ -141,4 +141,12 @@
         return CURRENT_QUEUE.get();
     }
     
+    public DispatchQueue getCurrentThreadQueue() {
+        DispatcherThread thread = DispatcherThread.currentDispatcherThread();
+        if( thread == null ) {
+            return null;
+        }
+        return thread.currentThreadQueue;
+    }
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
Tue Dec 15 17:03:19 2009
@@ -5,7 +5,6 @@
 
 public interface SimpleQueue extends DispatchQueue {
 
-    Runnable poll();
     DispatchPriority getPriority();
     
     SerialDispatchQueue isSerialDispatchQueue();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
Tue Dec 15 17:03:19 2009
@@ -69,24 +69,18 @@
         }
     }
 
-    public Runnable poll() {
-        
-        // This method should only be called by our dispatcher 
-        // thread.
-        assert Thread.currentThread()==dispatcher;
-        
-        Runnable rc = localRunnables.poll();
-        if( rc !=null ) {
-            return rc;
-        }
-        
-        rc = runnables.poll();
+    public Runnable pollShared() {
+        Runnable rc = runnables.poll();
         if( rc !=null ) {
             counter.decrementAndGet();
         }
         return rc;
     }
 
+    public Runnable pollLocal() {
+        return localRunnables.poll();
+    }
+
     public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
         throw new RuntimeException("TODO: implement me.");
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
Tue Dec 15 17:03:19 2009
@@ -22,8 +22,6 @@
 import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatcher;
 import org.apache.activemq.dispatch.internal.simple.SimpleDispatcher;
 
-import static org.apache.activemq.dispatch.DispatchPriority.*;
-
 import static java.lang.String.*;
 
 /**
@@ -35,33 +33,35 @@
     public static void main(String[] args) throws Exception {
         Dispatcher advancedSystem = new AdvancedDispatcher(new DispatcherConfig());
         advancedSystem.retain();
-        benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
-        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createSerialQueue("test",
DispatchOption.STICK_TO_CALLER_THREAD));
+        benchmarkGlobal("advanced global queue", advancedSystem);
+        benchmarkSerial("advanced private serial queue", advancedSystem);
 
         RunnableCountDownLatch latch = new RunnableCountDownLatch(1);
         advancedSystem.addShutdownWatcher(latch);
         advancedSystem.release();
         latch.await();
 
-        Dispatcher simpleSystem = new SimpleDispatcher(new DispatcherConfig());
+        DispatcherConfig config = new DispatcherConfig();
+        config.setThreads(6);
+        Dispatcher simpleSystem = new SimpleDispatcher(config);
         simpleSystem.retain();
         
-        benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));
-        benchmark("simple private serial queue", simpleSystem, simpleSystem.createSerialQueue("test",
DispatchOption.STICK_TO_CALLER_THREAD));
+        benchmarkGlobal("simple global queue", simpleSystem);
+        benchmarkSerial("simple private serial queue", simpleSystem);
 
         latch = new RunnableCountDownLatch(1);
-        advancedSystem.addShutdownWatcher(latch);
-        advancedSystem.release();
+        simpleSystem.addShutdownWatcher(latch);
+        simpleSystem.release();
         latch.await();
     }
 
-    private static void benchmark(String name, Dispatcher dispatcher, DispatchQueue queue)
throws InterruptedException {
+    private static void benchmarkSerial(String name, Dispatcher dispatcher) throws InterruptedException
{
         // warm the JIT up..
-        benchmarkWork(dispatcher, queue, 100000);
+        benchmarkSerialWork(dispatcher, 100000);
         
         int iterations = 1000*1000*20;
         long start = System.nanoTime();
-        benchmarkWork(dispatcher, queue, iterations);
+        benchmarkSerialWork(dispatcher, iterations);
         long end = System.nanoTime();
         
         double durationMS = 1.0d*(end-start)/1000000d;
@@ -70,13 +70,14 @@
         System.out.println(format("name: %s, duration: %,.3f ms, rate: %,.2f executions/sec",
name, durationMS, rate));
     }
 
-    private static void benchmarkWork(final Dispatcher dispatcher, final DispatchQueue queue,
int iterations) throws InterruptedException {
+    private static void benchmarkSerialWork(final Dispatcher dispatcher, int iterations)
throws InterruptedException {
+        final DispatchQueue queue = dispatcher.createSerialQueue(null, DispatchOption.STICK_TO_CALLER_THREAD);
         final CountDownLatch counter = new CountDownLatch(iterations);
         Runnable task = new Runnable(){
             public void run() {
                 counter.countDown();
                 if( counter.getCount()>0 ) {
-                    dispatcher.getCurrentQueue().dispatchAsync(this);
+                    queue.dispatchAsync(this);
                 }
             }
         };
@@ -85,4 +86,51 @@
         }
         counter.await();
     }
+    
+    private static void benchmarkGlobal(String name, Dispatcher dispatcher) throws InterruptedException
{
+        // warm the JIT up..
+        benchmarkGlobalWork(dispatcher, 100000);
+        
+        int iterations = 1000*1000*20;
+        long start = System.nanoTime();
+        benchmarkGlobalWork(dispatcher, iterations);
+        long end = System.nanoTime();
+        
+        double durationMS = 1.0d*(end-start)/1000000d;
+        double rate = 1000d * iterations / durationMS;
+        
+        System.out.println(format("name: %s, duration: %,.3f ms, rate: %,.2f executions/sec",
name, durationMS, rate));
+    }
+    
+    
+    private static final class TestRunnable implements Runnable {
+        private final int counter;
+        private final Runnable onDone;
+        private final DispatchQueue queue;
+
+        private TestRunnable(int counter, DispatchQueue queue, Runnable onDone) {
+            this.counter=counter;
+            this.onDone = onDone;
+            this.queue = queue;
+        }
+
+        public void run() {
+            if( counter ==0 ) {
+                onDone.run();
+            } else {
+                queue.dispatchAsync(new TestRunnable(counter-1, queue, onDone));
+            }
+        }
+    }
+    
+    private static void benchmarkGlobalWork(final Dispatcher dispatcher, int iterations)
throws InterruptedException {
+        final DispatchQueue queue = dispatcher.getGlobalQueue();
+        int PARTITIONS = 1000;
+        RunnableCountDownLatch counter = new RunnableCountDownLatch(PARTITIONS);
+        for (int i = 0; i < PARTITIONS; i++) {
+            queue.dispatchAsync(new TestRunnable(iterations/PARTITIONS, queue, counter));
+        }
+        counter.await();
+    }
+
 }



Mime
View raw message