activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r887630 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src: main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/dispatch/internal/ main/java/org/apache/activemq/dispatch/internal/advanced/ main/java/org/a...
Date Sun, 06 Dec 2009 02:52:47 GMT
Author: chirino
Date: Sun Dec  6 02:52:35 2009
New Revision: 887630

URL: http://svn.apache.org/viewvc?rev=887630&view=rev
Log:
simple package now implements dispatchAfter 

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatchObserver.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java Sun Dec  6 02:52:35 2009
@@ -20,18 +20,14 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public interface DispatchObject {
+public interface DispatchObject extends Retained {
     
-    public void retain();
-    public void release();
-
     public <Context> Context getContext();
     public <Context> void setContext(Context context);
 
     public void suspend();
     public void resume();
 
-    public void setFinalizer(Runnable finalizer);
     public void setTargetQueue(DispatchQueue queue);
     public DispatchQueue getTargetQueue();
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java?rev=887630&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java Sun Dec  6 02:52:35 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch;
+
+public enum DispatchPriority {
+    HIGH,
+    DEFAULT,
+    LOW;
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java Sun Dec  6 02:52:35 2009
@@ -16,16 +16,19 @@
  */
 package org.apache.activemq.dispatch;
 
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
 /**
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public interface DispatchQueue extends DispatchObject {
+public interface DispatchQueue extends DispatchObject, Executor {
 
     public void dispatchAsync(Runnable runnable);
     public void dispatchSync(Runnable runnable) throws InterruptedException;
     
-    public void dispatchAfter(long delayMS, Runnable runnable);
+    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit);
     public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException;
     
     String getLabel();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java Sun Dec  6 02:52:35 2009
@@ -19,15 +19,16 @@
 
 import java.nio.channels.SelectableChannel;
 
-import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
 
-public interface DispatchSPI {
+public interface DispatchSPI extends Retained {
     public void start();
     public void shutdown(Runnable onShutdown);
     
     public DispatchQueue getMainQueue();
-    public DispatchQueue getGlobalQueue(DispatchQueuePriority priority);
+    public DispatchQueue getGlobalQueue();
+    public DispatchQueue getGlobalQueue(DispatchPriority priority);
     public DispatchQueue createQueue(String label);
     public void dispatchMain();
     public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue);
+    
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Sun Dec  6 02:52:35 2009
@@ -26,26 +26,24 @@
  */
 public class DispatchSystem {
 
-    public static enum DispatchQueuePriority {
-        HIGH,
-        DEFAULT,
-        LOW;
-    }
-
     public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
     static public DispatchQueue getCurrentQueue() {
         return CURRENT_QUEUE.get();
     }
 
     private static DispatchSPI spi;
-    
-    private static DispatchSPI cretateDispatchSystemSPI() {
-        return new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
+
+    public static DispatchSPI create() {
+        return create("system", Runtime.getRuntime().availableProcessors());
+    }
+
+    public static SimpleDispatchSPI create(String name, int threads) {
+        return new SimpleDispatchSPI(name, threads);
     }
     
     synchronized public static DispatchSPI spi() {
         if(spi==null) {
-            spi = cretateDispatchSystemSPI();
+            spi = create();
         }
         return spi;
     }
@@ -54,7 +52,11 @@
         return spi().getMainQueue();
     }
     
-    static public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
+    static public DispatchQueue getGlobalQueue() {
+        return spi().getGlobalQueue();
+    }
+    
+    static public DispatchQueue getGlobalQueue(DispatchPriority priority) {
         return spi().getGlobalQueue(priority);
     }
     

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java Sun Dec  6 02:52:35 2009
@@ -1,15 +1,14 @@
 package org.apache.activemq.dispatch;
 
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 
 /**
- * Handy interface to signal classes which would like an IDispatcher instance
+ * Handy interface to signal classes which would like an DispatchSPI instance
  * injected into them.
  *  
  * @author chirino
  */
 public interface DispatcherAware {
 
-	public void setDispatcher(AdvancedDispatchSPI dispatcher);
+	public void setDispatcher(DispatchSPI dispatcher);
 	
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java?rev=887630&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java Sun Dec  6 02:52:35 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Retained {
+    
+    public void retain();
+    public void release();
+    public void setShutdownHandler(Runnable shutdownHandler);
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Sun Dec  6 02:52:35 2009
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.dispatch.internal;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.activemq.dispatch.DispatchObject;
 import org.apache.activemq.dispatch.DispatchQueue;
 
@@ -25,11 +23,10 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-abstract public class AbstractDispatchObject implements DispatchObject {
+abstract public class AbstractDispatchObject extends BaseRetained implements DispatchObject {
 
     protected volatile Object context;
     protected volatile DispatchQueue targetQueue;
-    protected AtomicReference<Runnable> finalizer = new AtomicReference<Runnable>();
 
     @SuppressWarnings("unchecked")
     public <Context> Context getContext() {
@@ -40,10 +37,6 @@
         this.context = context;
     }
 
-    public void setFinalizer(Runnable finalizer) {
-        this.finalizer.set(finalizer);
-    }
-
     public void setTargetQueue(DispatchQueue targetQueue) {
         this.targetQueue = targetQueue;
     }
@@ -51,4 +44,6 @@
     public DispatchQueue getTargetQueue() {
         return this.targetQueue;
     }
+    
+
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java?rev=887630&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java Sun Dec  6 02:52:35 2009
@@ -0,0 +1,29 @@
+package org.apache.activemq.dispatch.internal;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class BaseRetained {
+    
+    final protected AtomicInteger reatinCounter = new AtomicInteger(1);
+    final protected AtomicReference<Runnable> shutdownHandler = new AtomicReference<Runnable>();
+
+    public void setShutdownHandler(Runnable finalizer) {
+        this.shutdownHandler.set(finalizer);
+    }
+    
+    public void retain() {
+        int prev = reatinCounter.getAndIncrement();
+        assert prev!=0;
+    }
+
+    public void release() {
+        if( reatinCounter.decrementAndGet()==0 ) {
+            Runnable value = shutdownHandler.getAndSet(null);
+            if( value!=null ) {
+                value.run();
+            }
+        }
+    }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java Sun Dec  6 02:52:35 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.dispatch.internal;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -31,7 +32,6 @@
 
     private final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
     final private String label;
-    final private AtomicInteger reatinCounter = new AtomicInteger(1);
     final private AtomicInteger suspendCounter = new AtomicInteger();
     final private AtomicLong size = new AtomicLong();
     
@@ -57,10 +57,14 @@
         suspendCounter.incrementAndGet();
     }
 
-    public void dispatchAfter(long delayMS, Runnable runnable) {
+    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
         throw new RuntimeException("TODO: implement me.");
     }
 
+    public void execute(Runnable command) {
+        dispatchAsync(command);
+    }
+
     public void dispatchAsync(Runnable runnable) {
         if( runnable == null ) {
             throw new IllegalArgumentException();
@@ -112,18 +116,4 @@
         QueueSupport.dispatchApply(this, iterations, runnable);
     }
 
-    public void retain() {
-        int prev = reatinCounter.getAndIncrement();
-        assert prev!=0;
-    }
-
-    public void release() {
-        if( reatinCounter.decrementAndGet()==0 ) {
-            Runnable value = finalizer.getAndSet(null);
-            if( value!=null ) {
-                value.run();
-            }
-        }
-    }
-
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java Sun Dec  6 02:52:35 2009
@@ -25,14 +25,18 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.DispatchSPI;
 import org.apache.activemq.dispatch.DispatchSource;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.internal.BaseRetained;
 import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
 
-import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
+import static org.apache.activemq.dispatch.DispatchPriority.*;
 
-public class AdvancedDispatchSPI implements DispatchSPI {
+import static org.apache.activemq.dispatch.DispatchPriority.*;
+
+
+public class AdvancedDispatchSPI extends BaseRetained implements DispatchSPI {
 
     final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
     final GlobalDispatchQueue globalQueues[];
@@ -56,7 +60,7 @@
         
         globalQueues = new GlobalDispatchQueue[3];
         for (int i = 0; i < 3; i++) {
-            globalQueues[i] = new GlobalDispatchQueue(this, DispatchQueuePriority.values()[i]);
+            globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i]);
         }
         
         loadBalancer = new SimpleLoadBalancer();
@@ -198,13 +202,17 @@
         return mainQueue;
     }
     
-    public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
+    public DispatchQueue getGlobalQueue() {
+        return getGlobalQueue(DEFAULT);
+    }
+
+    public DispatchQueue getGlobalQueue(DispatchPriority priority) {
         return globalQueues[priority.ordinal()];
     }
     
     public DispatchQueue createQueue(String label) {
         AdvancedSerialDispatchQueue rc = new AdvancedSerialDispatchQueue(label);
-        rc.setTargetQueue(getGlobalQueue(DEFAULT));
+        rc.setTargetQueue(getGlobalQueue());
         return rc;
     }
     

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Sun Dec  6 02:52:35 2009
@@ -24,8 +24,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.DispatchSystem;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.PriorityLinkedList;
 import org.apache.activemq.util.TimerHeap;
@@ -79,7 +79,7 @@
         
         this.dispatchQueues = new ThreadDispatchQueue[3];
         for (int i = 0; i < 3; i++) {
-            dispatchQueues[i] = new ThreadDispatchQueue(this, DispatchQueuePriority.values()[i]);
+            dispatchQueues[i] = new ThreadDispatchQueue(this, DispatchPriority.values()[i]);
         }
 
         MAX_USER_PRIORITY = priorities - 1;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Sun Dec  6 02:52:35 2009
@@ -19,7 +19,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.internal.QueueSupport;
 
 /**
@@ -30,9 +30,9 @@
 
     private final String label;
     private final AdvancedDispatchSPI spi;
-    private final DispatchQueuePriority priority;
+    private final DispatchPriority priority;
     
-    public GlobalDispatchQueue(AdvancedDispatchSPI spi, DispatchQueuePriority priority) {
+    public GlobalDispatchQueue(AdvancedDispatchSPI spi, DispatchPriority priority) {
         this.spi = spi;
         this.priority = priority;
         this.label=priority.toString();
@@ -42,12 +42,16 @@
         return label;
     }
 
+    public void execute(Runnable runnable) {
+        dispatchAsync(runnable);
+    }
+
     public void dispatchAsync(Runnable runnable) {
         spi.execute(runnable, priority.ordinal());
     }
 
-    public void dispatchAfter(long delayMS, Runnable runnable) {
-        spi.schedule(runnable, priority.ordinal(), delayMS, TimeUnit.MILLISECONDS);
+    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+        spi.schedule(runnable, priority.ordinal(), delay, TimeUnit.MILLISECONDS);
     }
 
     public void dispatchSync(final Runnable runnable) throws InterruptedException {
@@ -74,7 +78,7 @@
         throw new UnsupportedOperationException();
     }
 
-    public void setFinalizer(Runnable finalizer) {
+    public void setShutdownHandler(Runnable finalizer) {
         throw new UnsupportedOperationException();
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java Sun Dec  6 02:52:35 2009
@@ -19,7 +19,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.internal.QueueSupport;
 
 /**
@@ -30,9 +30,9 @@
 
     private final String label;
     private final DispatcherThread dispatcher;
-    private final DispatchQueuePriority priority;
+    private final DispatchPriority priority;
     
-    public ThreadDispatchQueue(DispatcherThread dispatcher, DispatchQueuePriority priority) {
+    public ThreadDispatchQueue(DispatcherThread dispatcher, DispatchPriority priority) {
         this.priority = priority;
         this.label=priority.toString()+" "+dispatcher.getName();
         this.dispatcher = dispatcher;
@@ -42,12 +42,16 @@
         return label;
     }
 
+    public void execute(Runnable runnable) {
+        dispatchAsync(runnable);
+    }
+
     public void dispatchAsync(Runnable runnable) {
         dispatcher.execute(runnable, priority.ordinal());
     }
 
-    public void dispatchAfter(long delayMS, Runnable runnable) {
-        dispatcher.schedule(runnable, priority.ordinal(), delayMS, TimeUnit.MILLISECONDS);
+    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+        dispatcher.schedule(runnable, priority.ordinal(), delay, TimeUnit.MILLISECONDS);
     }
 
     public void dispatchSync(final Runnable runnable) throws InterruptedException {
@@ -74,7 +78,7 @@
         throw new UnsupportedOperationException();
     }
 
-    public void setFinalizer(Runnable finalizer) {
+    public void setShutdownHandler(Runnable finalizer) {
         throw new UnsupportedOperationException();
     }
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatchObserver.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatchObserver.java?rev=887630&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatchObserver.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatchObserver.java Sun Dec  6 02:52:35 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.simple;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSource;
+
+public interface DispatchObserver {
+    
+    public void onThreadCreate(DispatcherThread thread);
+    public void onThreadDestroy(DispatcherThread thread);
+
+    public void onQueueCreate(DispatchQueue queue);
+    public void onQueueDestroy(DispatchQueue queue);
+    
+    public void onSourceCreate(DispatchSource source);
+    public void onSourceDestroy(DispatchSource source);
+    
+    public void onDispatchRequest(DispatchQueue target, Runnable request);
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Sun Dec  6 02:52:35 2009
@@ -19,8 +19,8 @@
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.DispatchSystem;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
 
 /**
  * 
@@ -36,9 +36,9 @@
         this.spi = spi;
         this.threadQueues = new ThreadDispatchQueue[3];
         for (int i = 0; i < 3; i++) {
-            threadQueues[i] = new ThreadDispatchQueue(this, DispatchQueuePriority.values()[i] );
+            threadQueues[i] = new ThreadDispatchQueue(this, DispatchPriority.values()[i] );
         }
-        setName("dispatcher:"+(ordinal+1));
+        setName(spi.getLabel()+" dispatcher: "+(ordinal+1));
         setDaemon(true);
     }
     

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Sun Dec  6 02:52:35 2009
@@ -17,10 +17,11 @@
 package org.apache.activemq.dispatch.internal.simple;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.internal.QueueSupport;
 
 /**
@@ -29,31 +30,35 @@
  */
 public class GlobalDispatchQueue implements SimpleQueue {
 
-    private final SimpleDispatchSPI system;
+    private final SimpleDispatchSPI spi;
     final String label;
     final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
     final AtomicLong counter;
-    private final DispatchQueuePriority priority;
+    private final DispatchPriority priority;
 
-    public GlobalDispatchQueue(SimpleDispatchSPI system, DispatchQueuePriority priority) {
-        this.system = system;
+    public GlobalDispatchQueue(SimpleDispatchSPI spi, DispatchPriority priority) {
+        this.spi = spi;
         this.priority = priority;
         this.label=priority.toString();
-        this.counter = system.globalQueuedRunnables;
+        this.counter = spi.globalQueuedRunnables;
     }
 
     public String getLabel() {
         return label;
     }
 
+    public void execute(Runnable runnable) {
+        dispatchAsync(runnable);
+    }
+
     public void dispatchAsync(Runnable runnable) {
         this.counter.incrementAndGet();
         runnables.add(runnable);
-        system.wakeup();
+        spi.wakeup();
     }
 
-    public void dispatchAfter(long delayMS, Runnable runnable) {
-        throw new RuntimeException("TODO: implement me.");
+    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+        spi.timerThread.addRelative(runnable, this, delay, unit);
     }
 
     public void dispatchSync(final Runnable runnable) throws InterruptedException {
@@ -80,7 +85,7 @@
         throw new UnsupportedOperationException();
     }
 
-    public void setFinalizer(Runnable finalizer) {
+    public void setShutdownHandler(Runnable finalizer) {
         throw new UnsupportedOperationException();
     }
 
@@ -100,7 +105,7 @@
         return rc;
     }
 
-    public DispatchQueuePriority getPriority() {
+    public DispatchPriority getPriority() {
         return priority;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java Sun Dec  6 02:52:35 2009
@@ -18,16 +18,19 @@
 
 import java.nio.channels.SelectableChannel;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.DispatchSPI;
 import org.apache.activemq.dispatch.DispatchSource;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.internal.BaseRetained;
 import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
 
-import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
+import static org.apache.activemq.dispatch.DispatchPriority.*;
+
 
 
 /**
@@ -35,7 +38,7 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class SimpleDispatchSPI implements DispatchSPI {
+public class SimpleDispatchSPI extends BaseRetained implements DispatchSPI {
         
     final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
     final GlobalDispatchQueue globalQueues[]; 
@@ -45,26 +48,38 @@
     final ConcurrentLinkedQueue<DispatcherThread> waitingDispatchers = new ConcurrentLinkedQueue<DispatcherThread>();
     final AtomicInteger waitingDispatcherCount = new AtomicInteger();
     final AtomicInteger startCounter = new AtomicInteger();
+    private final String label;
+    TimerThread timerThread;
     
-    public SimpleDispatchSPI(int size) {
+    public SimpleDispatchSPI(String label, int size) {
+        this.label = label;
         globalQueues = new GlobalDispatchQueue[3];
         for (int i = 0; i < 3; i++) {
-            globalQueues[i] = new GlobalDispatchQueue(this, DispatchQueuePriority.values()[i] );
+            globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i] );
         }
         dispatchers = new DispatcherThread[size];
     }
-    
+
     public DispatchQueue getMainQueue() {
         return mainQueue;
     }
     
-    public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
+    public DispatchQueue getGlobalQueue() {
+        return getGlobalQueue(DEFAULT);
+    }
+
+    public DispatchQueue getGlobalQueue(DispatchPriority priority) {
         return globalQueues[priority.ordinal()];
     }
     
     public DispatchQueue createQueue(String label) {
-        SerialDispatchQueue rc = new SerialDispatchQueue(label);
-        rc.setTargetQueue(getGlobalQueue(DEFAULT));
+        SerialDispatchQueue rc = new SerialDispatchQueue(label) {
+            @Override
+            public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+                timerThread.addRelative(runnable, this, delay, unit);
+            }
+        };
+        rc.setTargetQueue(getGlobalQueue());
         return rc;
     }
     
@@ -98,26 +113,34 @@
                 dispatchers[i] = new DispatcherThread(this, i);
                 dispatchers[i].start();
             }
+            timerThread = new TimerThread(this);
+            timerThread.start();
         }
     }
 
     public void shutdown(final Runnable onShutdown) {
         if( startCounter.decrementAndGet()==0 ) {
             
-            final AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length);
+            final AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length+1);
+            Runnable wrapper = new Runnable() {
+                public void run() {
+                    if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+                        onShutdown.run();
+                    }
+                    throw new DispatcherThread.Shutdown();
+                }
+            };
+
+            timerThread.shutdown(wrapper);
             for (int i = 0; i < dispatchers.length; i++) {
                 ThreadDispatchQueue queue = dispatchers[i].threadQueues[LOW.ordinal()];
-                queue.runnables.add(new Runnable() {
-                    public void run() {
-                        if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
-                            onShutdown.run();
-                        }
-                        throw new DispatcherThread.Shutdown();
-                    }
-                });
+                queue.runnables.add(wrapper);
             }
         }
     }
 
+    public String getLabel() {
+        return label;
+    }
     
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java Sun Dec  6 02:52:35 2009
@@ -1,10 +1,10 @@
 package org.apache.activemq.dispatch.internal.simple;
 
 import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.DispatchPriority;
 
 public interface SimpleQueue extends DispatchQueue {
 
     Runnable poll();
-    DispatchQueuePriority getPriority();
+    DispatchPriority getPriority();
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java Sun Dec  6 02:52:35 2009
@@ -18,10 +18,11 @@
 
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.internal.QueueSupport;
 
 /**
@@ -35,9 +36,9 @@
     final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
     private DispatcherThread dispatcher;
     final AtomicLong counter;
-    private final DispatchQueuePriority priority;
+    private final DispatchPriority priority;
     
-    public ThreadDispatchQueue(DispatcherThread dispatcher, DispatchQueuePriority priority) {
+    public ThreadDispatchQueue(DispatcherThread dispatcher, DispatchPriority priority) {
         this.dispatcher = dispatcher;
         this.priority = priority;
         this.label=priority.toString();
@@ -48,6 +49,10 @@
         return label;
     }
 
+    public void execute(Runnable runnable) {
+        dispatchAsync(runnable);
+    }
+    
     public void dispatchAsync(Runnable runnable) {
         // We don't have to take the synchronization hit 
         // if the current thread is the dispatcher since we know it's not
@@ -79,7 +84,7 @@
         return rc;
     }
 
-    public void dispatchAfter(long delayMS, Runnable runnable) {
+    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
         throw new RuntimeException("TODO: implement me.");
     }
 
@@ -107,7 +112,7 @@
         throw new UnsupportedOperationException();
     }
 
-    public void setFinalizer(Runnable finalizer) {
+    public void setShutdownHandler(Runnable finalizer) {
         throw new UnsupportedOperationException();
     }
 
@@ -118,8 +123,7 @@
         throw new UnsupportedOperationException();
     }
     
-    
-    public DispatchQueuePriority getPriority() {
+    public DispatchPriority getPriority() {
         return priority;
     }
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java?rev=887630&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java Sun Dec  6 02:52:35 2009
@@ -0,0 +1,138 @@
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.util.TimerHeap;
+
+import static org.apache.activemq.dispatch.internal.simple.TimerThread.Type.*;
+
+public class TimerThread extends Thread {
+    enum Type {
+        RELATIVE,
+        ABSOLUTE,
+        SHUTDOWN
+    }
+    final private static class TimerRequest {
+        Type type;
+        long time;
+        TimeUnit unit;
+        Runnable runnable;
+        DispatchQueue target;
+    }
+
+    private final Object mutex = new Object();
+    private ArrayList<TimerRequest> requests = new ArrayList<TimerRequest>();
+    
+    public TimerThread(SimpleDispatchSPI spi) {
+        setName(spi.getLabel()+" timer");
+        setDaemon(true);
+    }
+
+    public final void addAbsolute(Runnable runnable, DispatchQueue target, long time, TimeUnit unit) {
+        TimerRequest request = new TimerRequest();
+        request.type = ABSOLUTE;
+        request.time = time;
+        request.unit = unit;
+        request.runnable = runnable;
+        request.target = target;
+        add(request);
+    }
+
+    public final void addRelative(Runnable runnable, DispatchQueue target, long delay, TimeUnit unit) {
+        TimerRequest request = new TimerRequest();
+        request.type = RELATIVE;
+        request.time = delay;
+        request.unit = unit;
+        request.runnable = runnable;
+        request.target = target;
+        add(request);
+    }
+
+    public final void shutdown(Runnable onShutdown) {
+        TimerRequest request = new TimerRequest();
+        request.type = SHUTDOWN;
+        request.runnable = onShutdown;
+        add(request);
+    }
+
+    private void add(TimerRequest request) {
+        synchronized(mutex) {
+            requests.add(request);
+            mutex.notify();
+        }
+    }
+
+    public void run() {
+        
+        final TimerHeap<TimerRequest> timerHeap = new TimerHeap<TimerRequest>() {
+            @Override
+            protected final void execute(TimerRequest request) {
+                request.target.dispatchAsync(request.runnable);
+            }
+        };
+        
+        ArrayList<TimerRequest> swaped = new ArrayList<TimerRequest>();
+        
+        try {
+            for(;;) {
+
+                synchronized(mutex) {
+                    // Swap the arrays.
+                    ArrayList<TimerRequest> t = requests;
+                    requests = swaped;
+                    swaped = t;
+                }
+                
+                if( !swaped.isEmpty() ) {
+                    for (TimerRequest request : swaped) {
+                        switch( request.type ) {
+                        case RELATIVE:
+                            timerHeap.addRelative(request, request.time, request.unit);
+                            break;
+                        case ABSOLUTE:
+                            timerHeap.addAbsolute(request, request.time, request.unit);
+                            break;
+                        case SHUTDOWN:
+                            if( request.runnable!=null ) {
+                                request.runnable.run();
+                            }
+                            return;
+                        }
+                    }
+                    swaped.clear();
+                }
+                
+                timerHeap.executeReadyTimers();
+
+                long start = System.nanoTime();
+                long next = timerHeap.timeToNext(TimeUnit.NANOSECONDS);
+                
+                if( next==0 ) {
+                    continue;
+                }
+                
+                // if it's coming up soon.. just spin..
+                if( next>0 && next < 1000 ) {
+                    while( System.nanoTime()-start < next ) {
+                    }
+                    continue;
+                }
+                
+                long waitms = next / 1000000;
+                int waitns = (int) (next % 1000000);
+                synchronized(mutex) {
+                    if( requests.isEmpty() ) {
+                        if(next==-1) {
+                            mutex.wait();
+                        }  else {
+                            mutex.wait(waitms, waitns);
+                        }
+                    }
+                }                
+            }
+        } catch (InterruptedException e) {
+        }
+    }
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887630&r1=887629&r2=887630&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Sun Dec  6 02:52:35 2009
@@ -22,8 +22,9 @@
 import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSPI;
 
+import static org.apache.activemq.dispatch.DispatchPriority.*;
+
 import static java.lang.String.*;
-import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
 
 /**
  * 
@@ -41,7 +42,7 @@
         advancedSystem.shutdown(latch);
         latch.await();
 
-        DispatchSPI simpleSystem = new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
+        DispatchSPI simpleSystem = new SimpleDispatchSPI("test", Runtime.getRuntime().availableProcessors());
         simpleSystem.start();
         
         benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));



Mime
View raw message