activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r887139 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src: main/java/org/apache/activemq/dispatch/internal/simple/ test/java/org/apache/activemq/dispatch/
Date Fri, 04 Dec 2009 10:04:36 GMT
Author: chirino
Date: Fri Dec  4 10:04:34 2009
New Revision: 887139

URL: http://svn.apache.org/viewvc?rev=887139&view=rev
Log:
giving the simple impl a ThreadDispatchQueue too.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=887139&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
(added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
Fri Dec  4 10:04:34 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchSystem;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final public class DispatcherThread extends Thread {
+    private final SimpleDispatchSPI spi;
+    private final ThreadDispatchQueue[] threadQueues;
+    final AtomicLong threadQueuedRunnables = new AtomicLong();
+
+    public DispatcherThread(SimpleDispatchSPI spi, int ordinal) {
+        this.spi = spi;
+        this.threadQueues = new ThreadDispatchQueue[3];
+        for (int i = 0; i < 3; i++) {
+            threadQueues[i] = new ThreadDispatchQueue(this, DispatchQueuePriority.values()[i]
);
+        }
+        setName("dispatcher:"+(ordinal+1));
+        setDaemon(true);
+    }
+    
+    @Override
+    public void run() {
+        outer: while( true ) {
+            int counter=0;
+            for (SimpleQueue queue : threadQueues) {
+                DispatchSystem.CURRENT_QUEUE.set(queue);
+                Runnable runnable;
+                while( (runnable = queue.poll())!=null ) {
+                    dispatch(runnable);
+                    counter++;
+                }
+            }
+            if( counter!=0 ) {
+                // don't service the global queues until the thread queues are 
+                // drained.
+                continue;
+            }
+            
+            for (SimpleQueue queue : spi.globalQueues) {
+                DispatchSystem.CURRENT_QUEUE.set(threadQueues[queue.getPriority().ordinal()]);
+                
+                Runnable runnable;
+                while( (runnable = queue.poll())!=null ) {
+                    dispatch(runnable);
+                    counter++;
+                    
+                    // Thread queues have the priority.
+                    if( threadQueuedRunnables.get()!=0 ) {
+                        continue outer;
+                    }
+                }
+            }
+            if( counter!=0 ) {
+                // don't wait for wake up until we could find 
+                // no runnables to dispatch.
+                continue;
+            }
+            try {
+                waitForWakeup();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                return;
+            }
+        }
+    }
+
+    private void dispatch(Runnable runnable) {
+        try {
+            runnable.run();
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+    }
+
+    private final Object wakeupMutex = new Object();
+    private boolean inWaitingList;
+    
+    private void waitForWakeup() throws InterruptedException {
+        while( threadQueuedRunnables.get()==0 && spi.globalQueuedRunnables.get()==0
) {
+            synchronized(wakeupMutex) {
+                if( !inWaitingList ) {
+                    spi.addWaitingDispatcher(this);
+                    inWaitingList=true;
+                }
+                wakeupMutex.wait();
+            }
+        }
+    }
+
+    public void globalWakeup() {
+        synchronized(wakeupMutex) {
+            inWaitingList=false;
+            wakeupMutex.notify();
+        }
+    }
+    
+    public void wakeup() {
+        synchronized(wakeupMutex) {
+            wakeupMutex.notify();
+        }
+    }
+   
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=887139&r1=887138&r2=887139&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
Fri Dec  4 10:04:34 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.dispatch.internal.simple;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
@@ -26,15 +27,19 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class GlobalDispatchQueue implements DispatchQueue {
+public class GlobalDispatchQueue implements SimpleQueue {
 
     private final SimpleDispatchSPI system;
     final String label;
     final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
-    
+    final AtomicLong counter;
+    private final DispatchQueuePriority priority;
+
     public GlobalDispatchQueue(SimpleDispatchSPI system, DispatchQueuePriority priority)
{
         this.system = system;
+        this.priority = priority;
         this.label=priority.toString();
+        this.counter = system.globalQueuedRunnables;
     }
 
     public String getLabel() {
@@ -42,6 +47,7 @@
     }
 
     public void dispatchAsync(Runnable runnable) {
+        this.counter.incrementAndGet();
         runnables.add(runnable);
         system.wakeup();
     }
@@ -81,5 +87,16 @@
     public void setTargetQueue(DispatchQueue queue) {
         throw new UnsupportedOperationException();
     }
+    
+    public Runnable poll() {
+        Runnable rc = runnables.poll();
+        if( rc !=null ) {
+            counter.decrementAndGet();
+        }
+        return rc;
+    }
 
+    public DispatchQueuePriority getPriority() {
+        return priority;
+    }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java?rev=887139&r1=887138&r2=887139&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
Fri Dec  4 10:04:34 2009
@@ -17,6 +17,8 @@
 package org.apache.activemq.dispatch.internal.simple;
 
 import java.nio.channels.SelectableChannel;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.DispatchQueue;
@@ -37,20 +39,22 @@
         
     final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
     final GlobalDispatchQueue globalQueues[]; 
-    final Dispatcher dispatchers[];
-
-    private final Object wakeupMutex = new Object();
+    final DispatcherThread dispatchers[];
     final AtomicLong globalQueuedRunnables = new AtomicLong();
     
+    final ConcurrentLinkedQueue<DispatcherThread> waitingDispatchers = new ConcurrentLinkedQueue<DispatcherThread>();
+    final AtomicInteger waitingDispatcherCount = new AtomicInteger();
+
+    
     public SimpleDispatchSPI(int size) {
         globalQueues = new GlobalDispatchQueue[3];
         for (int i = 0; i < 3; i++) {
             globalQueues[i] = new GlobalDispatchQueue(this, DispatchQueuePriority.values()[i]
);
         }
                                   
-        dispatchers = new Dispatcher[size];
+        dispatchers = new DispatcherThread[size];
         for (int i = 0; i < size; i++) {
-            dispatchers[i] = new Dispatcher(this, i);
+            dispatchers[i] = new DispatcherThread(this, i);
             dispatchers[i].start();
             
         }
@@ -77,20 +81,22 @@
     public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue
queue) {
         return null;
     }
-    
-    public void waitForWakeup() throws InterruptedException {
-        while( globalQueuedRunnables.get()==0 ) {
-            synchronized(wakeupMutex) {
-                wakeupMutex.wait();
-            }
-        }
+
+    public void addWaitingDispatcher(DispatcherThread dispatcher) {
+        waitingDispatcherCount.incrementAndGet();
+        waitingDispatchers.add(dispatcher);
     }
     
-    void wakeup() {
-        if( globalQueuedRunnables.incrementAndGet() < dispatchers.length ) {
-            synchronized(wakeupMutex) {
-                wakeupMutex.notify();
+    public void wakeup() {
+        int value = waitingDispatcherCount.get();
+        if( value!=0 ) {
+            DispatcherThread dispatcher = waitingDispatchers.poll();
+            if( dispatcher!=null ) {
+                waitingDispatcherCount.decrementAndGet();
+                dispatcher.globalWakeup();
             }
         }
     }
+
+    
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java?rev=887139&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
(added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
Fri Dec  4 10:04:34 2009
@@ -0,0 +1,10 @@
+package org.apache.activemq.dispatch.internal.simple;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+
+public interface SimpleQueue extends DispatchQueue {
+
+    Runnable poll();
+    DispatchQueuePriority getPriority();
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=887139&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
(added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
Fri Dec  4 10:04:34 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.internal.QueueSupport;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ThreadDispatchQueue implements SimpleQueue {
+
+    final String label;
+    final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+    private DispatcherThread dispatcher;
+    final AtomicLong counter;
+    private final DispatchQueuePriority priority;
+    
+    public ThreadDispatchQueue(DispatcherThread dispatcher, DispatchQueuePriority priority)
{
+        this.dispatcher = dispatcher;
+        this.priority = priority;
+        this.label=priority.toString();
+        this.counter = dispatcher.threadQueuedRunnables;
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    public void dispatchAsync(Runnable runnable) {
+        counter.incrementAndGet();
+        runnables.add(runnable);
+        dispatcher.wakeup();
+    }
+
+    public Runnable poll() {
+        Runnable rc = runnables.poll();
+        if( rc !=null ) {
+            counter.decrementAndGet();
+        }
+        return rc;
+    }
+
+    public void dispatchAfter(long delayMS, Runnable runnable) {
+        throw new RuntimeException("TODO: implement me.");
+    }
+
+    public void dispatchSync(final Runnable runnable) throws InterruptedException {
+        dispatchApply(1, runnable);
+    }
+    
+    public void dispatchApply(int iterations, final Runnable runnable) throws InterruptedException
{
+        QueueSupport.dispatchApply(this, iterations, runnable);
+    }
+
+    public void resume() {
+        throw new UnsupportedOperationException();
+    }
+
+    public void suspend() {
+        throw new UnsupportedOperationException();
+    }
+
+    public <Context> Context getContext() {
+        throw new UnsupportedOperationException();
+    }
+
+    public <Context> void setContext(Context context) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void setFinalizer(Runnable finalizer) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void setTargetQueue(DispatchQueue queue) {
+        throw new UnsupportedOperationException();
+    }
+    
+    public DispatchQueuePriority getPriority() {
+        return priority;
+    }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887139&r1=887138&r2=887139&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
Fri Dec  4 10:04:34 2009
@@ -33,12 +33,12 @@
     
     public static void main(String[] args) throws Exception {
         DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
-        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
         benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
+        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
 
         DispatchSPI simpleSystem = new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
-        benchmark("simple private serial queue", simpleSystem, simpleSystem.createQueue("test"));
         benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));
+        benchmark("simple private serial queue", simpleSystem, simpleSystem.createQueue("test"));
     }
 
     private static void benchmark(String name, DispatchSPI spi, DispatchQueue queue) throws
InterruptedException {



Mime
View raw message