activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r892359 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple: GlobalDispatchQueue.java SerialDispatchQueue.java ThreadDispatchQueue.java
Date Fri, 18 Dec 2009 19:46:38 GMT
Author: chirino
Date: Fri Dec 18 19:46:38 2009
New Revision: 892359

URL: http://svn.apache.org/viewvc?rev=892359&view=rev
Log:
The SerialDispatchQueue can not choose how to get dispatched.. locally vs globally.


Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=892359&r1=892358&r2=892359&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
Fri Dec 18 19:46:38 2009
@@ -36,7 +36,7 @@
 
     private final SimpleDispatcher dispatcher;
     final String label;
-    final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+    final ConcurrentLinkedQueue<Runnable> globalRunnables = new ConcurrentLinkedQueue<Runnable>();
     final AtomicLong counter;
     private final DispatchPriority priority;
 
@@ -57,13 +57,22 @@
 
     public void dispatchAsync(Runnable runnable) {
         DispatcherThread thread = DispatcherThread.currentDispatcherThread();
-        if( thread==null ) {
-            this.counter.incrementAndGet();
-            runnables.add(runnable);
-            dispatcher.wakeup();
+        if( runnable.getClass() == SerialDispatchQueue.class ) {
+            SerialDispatchQueue queue = ((SerialDispatchQueue)runnable);
+            queue.pick(this, thread);
         } else {
-            thread.currentThreadQueue.dispatchAsync(runnable);
-        }
+            if( thread==null ) {
+                enqueueExternal(runnable);
+            } else {
+                thread.currentThreadQueue.localEnqueue(runnable);
+            }
+        }        
+    }
+
+    void enqueueExternal(Runnable runnable) {
+        this.counter.incrementAndGet();
+        globalRunnables.add(runnable);
+        dispatcher.wakeup();
     }
 
     public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
@@ -87,7 +96,7 @@
     }
     
     public Runnable poll() {
-        Runnable rc = runnables.poll();
+        Runnable rc = globalRunnables.poll();
         if( rc !=null ) {
             counter.decrementAndGet();
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=892359&r1=892358&r2=892359&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
Fri Dec 18 19:46:38 2009
@@ -129,4 +129,16 @@
     public String toString() {
         return IntrospectionSupport.toString(this, "label", "size", "suspended", "retained");
     }
+
+    int localEnqueueCounter;
+    
+    public void pick(GlobalDispatchQueue queue, DispatcherThread thread) {
+        if( thread==null || localEnqueueCounter > 500 ) {
+            localEnqueueCounter=0;
+            queue.enqueueExternal(this);
+        } else {
+            localEnqueueCounter++;
+            thread.currentThreadQueue.localEnqueue(this);
+        }        
+    }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=892359&r1=892358&r2=892359&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
Fri Dec 18 19:46:38 2009
@@ -36,7 +36,7 @@
 
     final String label;
     final LinkedList<Runnable> localRunnables = new LinkedList<Runnable>();
-    final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+    final ConcurrentLinkedQueue<Runnable> globalRunnables = new ConcurrentLinkedQueue<Runnable>();
     final DispatcherThread dispatcher;
     final AtomicLong counter;
     final GlobalDispatchQueue globalQueue;
@@ -61,16 +61,24 @@
         // if the current thread is the dispatcher since we know it's not
         // waiting.
         if( Thread.currentThread()!=dispatcher ) {
-            counter.incrementAndGet();
-            runnables.add(runnable);
-            dispatcher.wakeup();
+            globalEnqueue(runnable);
         } else {
-            localRunnables.add(runnable);
+            localEnqueue(runnable);
         }
     }
 
+    void localEnqueue(Runnable runnable) {
+        localRunnables.add(runnable);
+    }
+
+    void globalEnqueue(Runnable runnable) {
+        counter.incrementAndGet();
+        globalRunnables.add(runnable);
+        dispatcher.wakeup();
+    }
+
     public Runnable pollShared() {
-        Runnable rc = runnables.poll();
+        Runnable rc = globalRunnables.poll();
         if( rc !=null ) {
             counter.decrementAndGet();
         }



Mime
View raw message