activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r891454 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src: main/java/org/apache/activemq/dispatch/internal/ main/java/org/apache/activemq/dispatch/internal/simple/ test/java/org/apache/activemq/dispatch/
Date Wed, 16 Dec 2009 22:21:07 GMT
Author: chirino
Date: Wed Dec 16 22:21:06 2009
New Revision: 891454

URL: http://svn.apache.org/viewvc?rev=891454&view=rev
Log:
Pushed down optimization in the simple serial queue into the absract serial queue.  Applicable
to the advanced impl.


Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=891454&r1=891453&r2=891454&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
Wed Dec 16 22:21:06 2009
@@ -19,14 +19,15 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.LinkedList;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchOption;
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.internal.simple.IntegerCounter;
 
 /**
  * 
@@ -36,10 +37,15 @@
 
     protected final String label;
     protected final AtomicInteger suspendCounter = new AtomicInteger();
+    protected final AtomicInteger executeCounter = new AtomicInteger();
     
-    protected final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+    protected final AtomicLong externalQueueSize = new AtomicLong();
+    protected final AtomicLong queueSize = new AtomicLong();
+    protected final ConcurrentLinkedQueue<Runnable> externalQueue = new ConcurrentLinkedQueue<Runnable>();
+
+    private final LinkedList<Runnable> localQueue = new LinkedList<Runnable>();
+    private final ThreadLocal<Boolean> executing = new ThreadLocal<Boolean>();
     
-    protected final AtomicLong size = new AtomicLong();
     protected final Set<DispatchOption> options;
 
     public AbstractSerialDispatchQueue(String label, DispatchOption...options) {
@@ -60,9 +66,7 @@
 
     public void resume() {
         if( suspendCounter.decrementAndGet() == 0 ) {
-            if( size.get() != 0 ) {
-                dispatchSelfAsync();
-            }
+            dispatchSelfAsync();
         }
     }
 
@@ -75,16 +79,21 @@
     }
 
     public void dispatchAsync(Runnable runnable) {
-        if( runnable == null ) {
-            throw new IllegalArgumentException();
-        }
-        long lastSize = size.getAndIncrement();
-        if( lastSize==0 ) {
+        assert runnable != null;
+
+        if( queueSize.getAndIncrement()==0 ) {
             retain();
         }
-        runnables.add(runnable);
-        if( lastSize == 0 && suspendCounter.get()<=0 ) {
-            dispatchSelfAsync();
+
+        // We can take a shortcut...
+        if( executing.get()!=null ) {
+            localQueue.add(runnable);
+        } else {
+            long lastSize = externalQueueSize.getAndIncrement();
+            externalQueue.add(runnable);
+            if( lastSize == 0 && suspendCounter.get()<=0 ) {
+                dispatchSelfAsync();
+            }
         }
     }
 
@@ -93,20 +102,68 @@
     }
 
     public void run() {
-        Runnable runnable;
-        long lsize = size.get();
-        while( suspendCounter.get() <= 0 && lsize > 0 ) {
-            try {
-                runnable = runnables.poll();
-                if( runnable!=null ) {
-                    runnable.run();
-                    lsize = size.decrementAndGet();
-                    if( lsize==0 ) {
-                        release();
+        IntegerCounter limit = new IntegerCounter();
+        limit.set(1000);
+        dispatch(limit);
+    }
+
+    protected void dispatch(IntegerCounter limit) {
+        executing.set(true);
+        // Protection against concurrent execution...
+        // Many threads can try to get in.. but only the first will win..
+        if( executeCounter.getAndIncrement()==0 ) {
+            // Do additional loops for each thread that could
+            // not make it in.  This protects us from exiting
+            // the dispatch loop but still just after a new
+            // thread was trying to get in.
+            do {
+                dispatchLoop(limit);
+            } while( executeCounter.decrementAndGet()>0 );
+        }
+        executing.remove();
+    }
+    
+    private void dispatchLoop(IntegerCounter limit) {
+        int counter=0;
+        try {
+            
+            Runnable runnable;
+            while( suspendCounter.get() <= 0 ) {
+                
+                if( (runnable = localQueue.poll())!=null ) {
+                    counter++;
+                    try {
+                        runnable.run();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                    if( limit.decrementAndGet() <= 0 ) {
+                        return;
                     }
+                    continue;
                 }
-            } catch (Throwable e) {
-                e.printStackTrace();
+    
+                long lsize = externalQueueSize.get();
+                if( lsize>0 ) {
+                    while( lsize > 0 ) {
+                        runnable = externalQueue.poll();
+                        if( runnable!=null ) {
+                            localQueue.add(runnable);
+                            lsize = externalQueueSize.decrementAndGet();
+                        }
+                    }
+                    continue;
+                }
+                
+                break;
+            }
+            
+        } finally {
+            long size = queueSize.addAndGet(-counter);
+            if( size==0 ) { 
+                release();
+            } else {
+                dispatchSelfAsync();
             }
         }
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=891454&r1=891453&r2=891454&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
Wed Dec 16 22:21:06 2009
@@ -17,7 +17,6 @@
 
 package org.apache.activemq.dispatch.internal.simple;
 
-import java.util.LinkedList;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.dispatch.DispatchOption;
@@ -30,8 +29,6 @@
     private final SimpleDispatcher dispatcher;
     private volatile boolean stickToThreadOnNextDispatch; 
     private volatile boolean stickToThreadOnNextDispatchRequest; 
-    private final LinkedList<Runnable> localEnqueues = new LinkedList<Runnable>();
-    private final ThreadLocal<Boolean> executing = new ThreadLocal<Boolean>();
     
     SerialDispatchQueue(SimpleDispatcher dispatcher, String label, DispatchOption...options)
{
         super(label, options);
@@ -68,72 +65,28 @@
             }
         }
 
-        // We can take a shortcut...
-        if( executing.get()!=null ) {
-            localEnqueues.add(runnable);
-        } else {
-            super.dispatchAsync(runnable);
-        }
+        super.dispatchAsync(runnable);
     }
     
     public void run() {
         SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
-        if( stickToThreadOnNextDispatch ) {
-            stickToThreadOnNextDispatch=false;
-            GlobalDispatchQueue global = current.isGlobalDispatchQueue();
-            if( global!=null ) {
-                setTargetQueue(global.getTargetQueue());
-            }
-        }
-        
-        DispatcherThread thread = DispatcherThread.currentDispatcherThread();
-        
         SimpleDispatcher.CURRENT_QUEUE.set(this);
-        executing.set(true);
+        
         try {
-            
-            Runnable runnable;
-            long lsize = size.get();
-            while( suspendCounter.get() <= 0 && lsize > 0 ) {
-                
-                runnable = runnables.poll();
-                if( runnable!=null ) {
-                    runnable.run();
-                    lsize = size.decrementAndGet();
-                    if( lsize==0 ) {
-                        release();
-                    }
-                    if( thread.executionCounter.decrementAndGet() <= 0 ) {
-                        return;
-                    }
-                }
-            }
-            
-            while( (runnable = localEnqueues.poll())!=null ) {
-                runnable.run();
-                if( thread.executionCounter.decrementAndGet() <= 0 ) {
-                    return;
+            if( stickToThreadOnNextDispatch ) {
+                stickToThreadOnNextDispatch=false;
+                GlobalDispatchQueue global = current.isGlobalDispatchQueue();
+                if( global!=null ) {
+                    setTargetQueue(global.getTargetQueue());
                 }
             }
             
+            DispatcherThread thread = DispatcherThread.currentDispatcherThread();
+            dispatch(thread.executionCounter);
         } finally {
-            executing.remove();
-            
-            if( !localEnqueues.isEmpty() ) {
-                
-                long lastSize = size.getAndAdd(localEnqueues.size());
-                if( lastSize==0 ) {
-                    retain();
-                }
-                runnables.addAll(localEnqueues);
-                localEnqueues.clear();
-                
-                if( suspendCounter.get()<=0 ) {
-                    dispatchSelfAsync();
-                }
-            }
             SimpleDispatcher.CURRENT_QUEUE.set(current);
         }
+
     }
     
     public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=891454&r1=891453&r2=891454&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
Wed Dec 16 22:21:06 2009
@@ -81,10 +81,9 @@
                 }
             }
         };
-//        for (int i = 0; i < 1000; i++) {
-//        queue.dispatchAsync(task);
-//        }
-        queue.dispatchAsync(task);
+        for (int i = 0; i < 1000; i++) {
+            queue.dispatchAsync(task);
+        }
         counter.await();
     }
     



Mime
View raw message