activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r887333 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/ activ...
Date Fri, 04 Dec 2009 19:48:22 GMT
Author: chirino
Date: Fri Dec  4 19:48:21 2009
New Revision: 887333

URL: http://svn.apache.org/viewvc?rev=887333&view=rev
Log:
AdvancedDispatchSPI is now used directly be the rest of the activemq modules.. 


Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
      - copied, changed from r887252, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java Fri Dec  4 19:48:21 2009
@@ -25,7 +25,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
@@ -42,7 +43,7 @@
     protected int inputResumeThreshold = 512 * 1024;
     protected boolean useAsyncWriteThread = true;
 
-    private Dispatcher dispatcher;
+    private AdvancedDispatchSPI dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
     private ExecutorService blockingWriter;
     private ExceptionListener exceptionListener;
@@ -170,11 +171,11 @@
         this.priorityLevels = priorityLevels;
     }
 
-    public Dispatcher getDispatcher() {
+    public AdvancedDispatchSPI getDispatcher() {
         return dispatcher;
     }
 
-    public void setDispatcher(Dispatcher dispatcher) {
+    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
         this.dispatcher = dispatcher;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Fri Dec  4 19:48:21 2009
@@ -25,8 +25,10 @@
 
 import org.apache.activemq.Service;
 import org.apache.activemq.apollo.Connection;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherAware;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
@@ -51,7 +53,7 @@
 
     private final LinkedHashMap<AsciiBuffer, VirtualHost> virtualHosts = new LinkedHashMap<AsciiBuffer, VirtualHost>();
     private VirtualHost defaultVirtualHost;
-    private Dispatcher dispatcher;
+    private AdvancedDispatchSPI dispatcher;
     private File dataDirectory;
 
     private final class BrokerAcceptListener implements TransportAcceptListener {
@@ -129,7 +131,7 @@
 		// apply some default configuration to this broker instance before it's started.
 		if( dispatcher == null ) {
 			int threads = Runtime.getRuntime().availableProcessors();
-			dispatcher = DispatcherThread.createPriorityDispatchPool("Broker: "+getDefaultVirtualHost().getHostName(), Broker.MAX_PRIORITY, threads);
+			dispatcher = new AdvancedDispatchSPI(threads, Broker.MAX_PRIORITY);
 		}
 		
 
@@ -376,10 +378,10 @@
     // /////////////////////////////////////////////////////////////////
     // Property Accessors
     // /////////////////////////////////////////////////////////////////
-    public Dispatcher getDispatcher() {
+    public AdvancedDispatchSPI getDispatcher() {
         return dispatcher;
     }
-    public void setDispatcher(Dispatcher dispatcher) {
+    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
     	assertInConfigurationState();
         this.dispatcher = dispatcher;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Fri Dec  4 19:48:21 2009
@@ -40,8 +40,8 @@
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherAware;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
@@ -71,7 +71,7 @@
     private final FlowController<OperationBase<?>> storeController;
     private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
 
-    private Dispatcher dispatcher;
+    private AdvancedDispatchSPI dispatcher;
     private Thread flushThread;
     private AtomicBoolean running = new AtomicBoolean(false);
     private DatabaseListener listener;
@@ -1288,11 +1288,11 @@
         return store.allocateStoreTracking();
     }
 
-    public Dispatcher getDispatcher() {
+    public AdvancedDispatchSPI getDispatcher() {
         return dispatcher;
     }
 
-    public void setDispatcher(Dispatcher dispatcher) {
+    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
         this.dispatcher = dispatcher;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Fri Dec  4 19:48:21 2009
@@ -24,7 +24,7 @@
 
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
 import org.apache.activemq.flow.SizeLimiter;
@@ -52,7 +52,7 @@
     private static final boolean USE_PRIORITY_QUEUES = true;
 
     private BrokerDatabase database;
-    private Dispatcher dispatcher;
+    private AdvancedDispatchSPI dispatcher;
 
     private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
     private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
@@ -226,7 +226,7 @@
         this.database = database;
     }
 
-    public void setDispatcher(Dispatcher dispatcher) {
+    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
         this.dispatcher = dispatcher;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Fri Dec  4 19:48:21 2009
@@ -30,7 +30,8 @@
 import org.apache.activemq.apollo.broker.Router;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.Period;
@@ -87,7 +88,7 @@
     protected Broker sendBroker;
     protected Broker rcvBroker;
     protected ArrayList<Broker> brokers = new ArrayList<Broker>();
-    protected Dispatcher dispatcher;
+    protected AdvancedDispatchSPI dispatcher;
     protected final AtomicLong msgIdGenerator = new AtomicLong();
     protected final AtomicBoolean stopping = new AtomicBoolean();
 
@@ -134,8 +135,8 @@
 
     protected abstract String getRemoteWireFormat();
 
-    protected Dispatcher createDispatcher() {
-        return DispatcherThread.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
+    protected AdvancedDispatchSPI createDispatcher() {
+        return new AdvancedDispatchSPI(asyncThreadPoolSize, Broker.MAX_PRIORITY);
     }
 
     @Test

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Fri Dec  4 19:48:21 2009
@@ -27,7 +27,8 @@
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.queue.IQueue;
 
@@ -38,7 +39,7 @@
 public class SharedQueueTest extends TestCase {
 
 
-    Dispatcher dispatcher;
+    AdvancedDispatchSPI dispatcher;
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
@@ -47,8 +48,8 @@
 
     protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
 
-    protected Dispatcher createDispatcher() {
-        return DispatcherThread.createPriorityDispatchPool("TestDispatcher", Broker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+    protected AdvancedDispatchSPI createDispatcher() {
+        return new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), Broker.MAX_PRIORITY);
     }
 
     protected int consumerStartDelay = 0;

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java?rev=887333&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java Fri Dec  4 19:48:21 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch;
+
+import java.nio.channels.SelectableChannel;
+
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+
+public interface DispatchSPI {
+    public void start();
+    public void shutdown(Runnable onShutdown);
+    
+    public DispatchQueue getMainQueue();
+    public DispatchQueue getGlobalQueue(DispatchQueuePriority priority);
+    public DispatchQueue createQueue(String label);
+    public void dispatchMain();
+    public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue);
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Fri Dec  4 19:48:21 2009
@@ -32,14 +32,6 @@
         LOW;
     }
 
-    static abstract public class DispatchSPI {
-        abstract public DispatchQueue getMainQueue();
-        abstract public DispatchQueue getGlobalQueue(DispatchQueuePriority priority);
-        abstract public DispatchQueue createQueue(String label);
-        abstract public void dispatchMain();
-        abstract public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue);
-    }
-
     public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
     static public DispatchQueue getCurrentQueue() {
         return CURRENT_QUEUE.get();

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java (from r887252, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java&r1=887252&r2=887333&rev=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java Fri Dec  4 19:48:21 2009
@@ -1,4 +1,6 @@
-package org.apache.activemq.dispatch.internal.advanced;
+package org.apache.activemq.dispatch;
+
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 
 /**
  * Handy interface to signal classes which would like an IDispatcher instance
@@ -8,6 +10,6 @@
  */
 public interface DispatcherAware {
 
-	public void setDispatcher(Dispatcher dispatcher);
+	public void setDispatcher(AdvancedDispatchSPI dispatcher);
 	
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java?rev=887333&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java Fri Dec  4 19:48:21 2009
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.nio.channels.SelectableChannel;
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSPI;
+import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
+
+import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
+
+public class AdvancedDispatchSPI implements DispatchSPI {
+
+    final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
+    final GlobalDispatchQueue globalQueues[];
+    final AtomicLong globalQueuedRunnables = new AtomicLong();
+
+    private final ThreadLocal<DispatcherThread> dispatcher = new ThreadLocal<DispatcherThread>();
+    private final ThreadLocal<PooledDispatchContext> dispatcherContext = new ThreadLocal<PooledDispatchContext>();
+    private final ArrayList<DispatcherThread> dispatchers = new ArrayList<DispatcherThread>();
+
+    final AtomicInteger startCounter = new AtomicInteger();
+//    final AtomicBoolean started = new AtomicBoolean();
+//    final AtomicBoolean shutdown = new AtomicBoolean();
+
+    private int roundRobinCounter = 0;
+    private int size;
+    private final int numPriorities;
+
+    protected LoadBalancer loadBalancer;
+
+    public AdvancedDispatchSPI(int size, int numPriorities) {
+        this.size = size;
+        this.numPriorities = numPriorities;
+        
+        globalQueues = new GlobalDispatchQueue[3];
+        for (int i = 0; i < 3; i++) {
+            globalQueues[i] = new GlobalDispatchQueue(this, DispatchQueuePriority.values()[i]);
+        }
+        
+        loadBalancer = new SimpleLoadBalancer();
+    }
+
+    /**
+     * Subclasses should implement this to return a new dispatcher.
+     * 
+     * @param name
+     *            The name to assign the dispatcher.
+     * @param pool
+     *            The pool.
+     * @return The new dispathcer.
+     */
+    protected DispatcherThread createDispatcher(String name) throws Exception {
+        return new DispatcherThread(this, name, numPriorities);
+    }
+
+    /**
+     * @see org.apache.activemq.dispatch.internal.advanced.Dispatcher#start()
+     */
+    public synchronized final void start()  {
+        if( startCounter.getAndIncrement()==0 ) {
+            // Create all the workers.
+            try {
+                loadBalancer.start();
+                for (int i = 0; i < size; i++) {
+                    DispatcherThread dispatacher = createDispatcher("dispatcher -" + (i + 1));
+                    dispatchers.add(dispatacher);
+                    dispatacher.start();
+                }
+            } catch (Exception e) {
+                shutdown();
+            }
+        }
+    }
+
+    public final void shutdown() {
+        shutdown(null);
+    }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+     */
+    public final void shutdown(Runnable onShutdown) {
+        if( startCounter.decrementAndGet()==0 ) {
+            final AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.size());
+            for (DispatcherThread d : new ArrayList<DispatcherThread>(dispatchers)) {
+                d.shutdown(shutdownCountDown, onShutdown);
+            }
+            loadBalancer.stop();
+        }
+    }
+
+    public void setCurrentDispatchContext(PooledDispatchContext context) {
+        dispatcherContext.set(context);
+    }
+
+    public PooledDispatchContext getCurrentDispatchContext() {
+        return dispatcherContext.get();
+    }
+
+    /**
+     * Returns the currently executing dispatcher, or null if the current thread
+     * is not a dispatcher:
+     * 
+     * @return The currently executing dispatcher
+     */
+    public Dispatcher getCurrentDispatcher() {
+        return dispatcher.get();
+    }
+
+    /**
+     * A Dispatcher must call this to indicate that is has started it's dispatch
+     * loop.
+     */
+    public void onDispatcherStarted(DispatcherThread d) {
+        dispatcher.set(d);
+        loadBalancer.onDispatcherStarted(d);
+    }
+
+    public LoadBalancer getLoadBalancer() {
+        return loadBalancer;
+    }
+
+    /**
+     * A Dispatcher must call this when exiting it's dispatch loop
+     */
+    public void onDispatcherStopped(Dispatcher d) {
+        synchronized (dispatchers) {
+            if (dispatchers.remove(d)) {
+                size--;
+            }
+        }
+        loadBalancer.onDispatcherStopped(d);
+    }
+
+    protected DispatcherThread chooseDispatcher() {
+        DispatcherThread d = dispatcher.get();
+        if (d == null) {
+            synchronized (dispatchers) {
+                if(dispatchers.isEmpty())
+                {
+                    throw new RejectedExecutionException();
+                }
+                if (++roundRobinCounter >= size) {
+                    roundRobinCounter = 0;
+                }
+                return dispatchers.get(roundRobinCounter);
+            }
+        } else {
+            return d;
+        }
+    }
+
+    public DispatchContext register(Runnable runnable, String name) {
+        return chooseDispatcher().register(runnable, name);
+    }
+
+	public int getSize() {
+		return size;
+	}
+	
+    public final Executor createPriorityExecutor(final int priority) {
+        return new Executor() {
+            public void execute(final Runnable runnable) {
+                chooseDispatcher().dispatch(runnable, priority);
+            }
+
+        };
+    }
+
+    public int getDispatchPriorities() {
+        // TODO Auto-generated method stub
+        return numPriorities;
+    }
+
+    public void execute(Runnable command) {
+        chooseDispatcher().dispatch(command, 0);
+    }
+    
+    public void execute(Runnable command, int priority) {
+        chooseDispatcher().dispatch(command, priority);
+    }
+
+    public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
+        chooseDispatcher().schedule(runnable, delay, timeUnit);
+    }
+
+    public void schedule(final Runnable runnable, int priority, long delay, TimeUnit timeUnit) {
+        chooseDispatcher().schedule(runnable, priority, delay, timeUnit);
+    }
+    
+    public DispatchQueue getMainQueue() {
+        return mainQueue;
+    }
+    
+    public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
+        return globalQueues[priority.ordinal()];
+    }
+    
+    public DispatchQueue createQueue(String label) {
+        SerialDispatchQueue rc = new SerialDispatchQueue(label);
+        rc.setTargetQueue(getGlobalQueue(DEFAULT));
+        return rc;
+    }
+    
+    public void dispatchMain() {
+        mainQueue.run();
+    }
+
+    public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
+        return null;
+    }    
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Fri Dec  4 19:48:21 2009
@@ -24,6 +24,7 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.dispatch.DispatchObserver;
 import org.apache.activemq.dispatch.DispatchSystem;
@@ -46,7 +47,7 @@
     protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
 
     // Set if this dispatcher is part of a dispatch pool:
-    protected final DispatcherPool dispatcherPool;
+    protected final AdvancedDispatchSPI spi;
 
     // The local dispatch queue:
     protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
@@ -74,7 +75,7 @@
         }
     };
 
-    protected DispatcherThread(String name, int priorities, DispatcherPool pooledDispactcher) {
+    protected DispatcherThread(AdvancedDispatchSPI spi, String name, int priorities) {
         this.name = name;
         
         this.dispatchQueues = new ThreadDispatchQueue[3];
@@ -88,15 +89,7 @@
         for (int i = 0; i < 2; i++) {
             foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
         }
-        this.dispatcherPool = pooledDispactcher;
-    }
-
-    public static final Dispatcher createPriorityDispatcher(String name, int numPriorities) {
-        return new DispatcherThread(name, numPriorities, null);
-    }
-
-    public static final Dispatcher createPriorityDispatchPool(String name, final int numPriorities, int size) {
-        return new DispatcherPool(name, size, numPriorities);
+        this.spi = spi;
     }
 
     @SuppressWarnings("unchecked")
@@ -167,21 +160,33 @@
      * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
      */
     public void shutdown() throws InterruptedException {
-        Thread joinThread = null;
+        Thread joinThread = shutdown(new AtomicInteger(1), null);
+        if (joinThread != null) {
+            // thread.interrupt();
+            joinThread.join();
+        }
+    }
+    
+    public Thread shutdown(final AtomicInteger shutdownCountDown, final Runnable onShutdown) {
         synchronized (this) {
             if (thread != null) {
                 dispatchInternal(new Runnable() {
                     public void run() {
                         running = false;
+                        if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+                            onShutdown.run();
+                        }
                     }
                 }, MAX_USER_PRIORITY + 1);
-                joinThread = thread;
+                Thread rc = thread;
                 thread = null;
+                return rc;
+            } else {
+                if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+                    onShutdown.run();
+                }
             }
-        }
-        if (joinThread != null) {
-            // thread.interrupt();
-            joinThread.join();
+            return null;
         }
     }
 
@@ -200,9 +205,9 @@
 
     public void run() {
 
-        if (dispatcherPool != null) {
+        if (spi != null) {
             // Inform the dispatcher that we have started:
-            dispatcherPool.onDispatcherStarted((DispatcherThread) this);
+            spi.onDispatcherStarted((DispatcherThread) this);
         }
 
         PriorityDispatchContext pdc;
@@ -216,14 +221,14 @@
                     }
                     
                     if (pdc.tracker != null) {
-                        dispatcherPool.setCurrentDispatchContext(pdc);
+                        spi.setCurrentDispatchContext(pdc);
                     }
 
                     counter++;
                     pdc.run();
 
                     if (pdc.tracker != null) {
-                        dispatcherPool.setCurrentDispatchContext(null);
+                        spi.setCurrentDispatchContext(null);
                     }
                 }
 
@@ -263,8 +268,8 @@
         } catch (Throwable thrown) {
             thrown.printStackTrace();
         } finally {
-            if (dispatcherPool != null) {
-                dispatcherPool.onDispatcherStopped((DispatcherThread) this);
+            if (spi != null) {
+                spi.onDispatcherStopped((DispatcherThread) this);
             }
             cleanup();
         }
@@ -421,8 +426,8 @@
     }
 
     private final DispatcherThread getCurrentDispatcher() {
-        if (dispatcherPool != null) {
-            return (DispatcherThread) dispatcherPool.getCurrentDispatcher();
+        if (spi != null) {
+            return (DispatcherThread) spi.getCurrentDispatcher();
         } else if (Thread.currentThread() == thread) {
             return (DispatcherThread) this;
         } else {
@@ -432,7 +437,7 @@
     }
 
     private final PooledDispatchContext getCurrentDispatchContext() {
-        return dispatcherPool.getCurrentDispatchContext();
+        return spi.getCurrentDispatchContext();
     }
 
     /**
@@ -464,8 +469,8 @@
             this.runnable = runnable;
             this.name = name;
             this.currentOwner = (DispatcherThread) DispatcherThread.this;
-            if (persistent && dispatcherPool != null) {
-                this.tracker = dispatcherPool.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
+            if (persistent && spi != null) {
+                this.tracker = spi.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
             } else {
                 this.tracker = null;
             }
@@ -688,4 +693,5 @@
     public String getName() {
         return name;
     }
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Fri Dec  4 19:48:21 2009
@@ -29,15 +29,13 @@
 public class GlobalDispatchQueue implements DispatchQueue {
 
     private final String label;
-    private final AdancedDispatchSPI system;
-    private final DispatcherPool dispatcher;
+    private final AdvancedDispatchSPI spi;
     private final DispatchQueuePriority priority;
     
-    public GlobalDispatchQueue(AdancedDispatchSPI system, DispatchQueuePriority priority) {
-        this.system = system;
+    public GlobalDispatchQueue(AdvancedDispatchSPI spi, DispatchQueuePriority priority) {
+        this.spi = spi;
         this.priority = priority;
         this.label=priority.toString();
-        this.dispatcher = this.system.pooledDispatcher;
     }
 
     public String getLabel() {
@@ -45,11 +43,11 @@
     }
 
     public void dispatchAsync(Runnable runnable) {
-        dispatcher.execute(runnable, priority.ordinal());
+        spi.execute(runnable, priority.ordinal());
     }
 
     public void dispatchAfter(long delayMS, Runnable runnable) {
-        dispatcher.schedule(runnable, priority.ordinal(), delayMS, TimeUnit.MILLISECONDS);
+        spi.schedule(runnable, priority.ordinal(), delayMS, TimeUnit.MILLISECONDS);
     }
 
     public void dispatchSync(final Runnable runnable) throws InterruptedException {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Fri Dec  4 19:48:21 2009
@@ -29,9 +29,9 @@
 final public class DispatcherThread extends Thread {
     private static final int MAX_DISPATCH_BEFORE_CHECKING_FOR_HIGHER_PRIO = 10000;
     private final SimpleDispatchSPI spi;
-    private final ThreadDispatchQueue[] threadQueues;
+    final ThreadDispatchQueue[] threadQueues;
     final AtomicLong threadQueuedRunnables = new AtomicLong();
-    
+        
     public DispatcherThread(SimpleDispatchSPI spi, int ordinal) {
         this.spi = spi;
         this.threadQueues = new ThreadDispatchQueue[3];
@@ -44,42 +44,43 @@
     
     @Override
     public void run() {
-        outer: while( true ) {
-            int counter=0;
-            for (SimpleQueue queue : threadQueues) {
-                DispatchSystem.CURRENT_QUEUE.set(queue);
-                Runnable runnable;
-                while( (runnable = queue.poll())!=null ) {
-                    dispatch(runnable);
-                    counter++;
+        try {
+            outer: while( true ) {
+                int counter=0;
+                for (SimpleQueue queue : threadQueues) {
+                    DispatchSystem.CURRENT_QUEUE.set(queue);
+                    Runnable runnable;
+                    while( (runnable = queue.poll())!=null ) {
+                        dispatch(runnable);
+                        counter++;
+                    }
+                }
+                if( counter!=0 ) {
+                    // don't service the global queues until the thread queues are 
+                    // drained.
+                    continue;
                 }
-            }
-            if( counter!=0 ) {
-                // don't service the global queues until the thread queues are 
-                // drained.
-                continue;
-            }
-            
-            for (SimpleQueue queue : spi.globalQueues) {
-                DispatchSystem.CURRENT_QUEUE.set(threadQueues[queue.getPriority().ordinal()]);
                 
-                Runnable runnable;
-                while( (runnable = queue.poll())!=null ) {
-                    dispatch(runnable);
-                    counter++;
+                for (SimpleQueue queue : spi.globalQueues) {
+                    DispatchSystem.CURRENT_QUEUE.set(threadQueues[queue.getPriority().ordinal()]);
                     
-                    // Thread queues have the priority.
-                    if( threadQueuedRunnables.get()!=0 ) {
-                        continue outer;
+                    Runnable runnable;
+                    while( (runnable = queue.poll())!=null ) {
+                        dispatch(runnable);
+                        counter++;
+                        
+                        // Thread queues have the priority.
+                        if( threadQueuedRunnables.get()!=0 ) {
+                            continue outer;
+                        }
                     }
                 }
-            }
-            if( counter!=0 ) {
-                // don't wait for wake up until we could find 
-                // no runnables to dispatch.
-                continue;
-            }
-        
+                if( counter!=0 ) {
+                    // don't wait for wake up until we could find 
+                    // no runnables to dispatch.
+                    continue;
+                }
+            
 //        GlobalDispatchQueue[] globalQueues = spi.globalQueues;
 //        while( true ) {
 //
@@ -93,14 +94,20 @@
 //                continue;
 //            }
 //        
-            try {
-                waitForWakeup();
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-                return;
+                try {
+                    waitForWakeup();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                    return;
+                }
             }
+        } catch (Shutdown e) {
         }
     }
+    
+    @SuppressWarnings("serial")
+    static class Shutdown extends RuntimeException {
+    }
 
     private boolean dispatch(SimpleQueue queue) {
         int counter=0;
@@ -122,6 +129,8 @@
     private void dispatch(Runnable runnable) {
         try {
             runnable.run();
+        } catch (Shutdown e) {
+            throw e;
         } catch (Throwable e) {
             e.printStackTrace();
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java Fri Dec  4 19:48:21 2009
@@ -22,9 +22,9 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSPI;
 import org.apache.activemq.dispatch.DispatchSource;
 import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchSPI;
 import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
 
 import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
@@ -35,7 +35,7 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class SimpleDispatchSPI extends DispatchSPI {
+public class SimpleDispatchSPI implements DispatchSPI {
         
     final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
     final GlobalDispatchQueue globalQueues[]; 
@@ -44,20 +44,14 @@
     
     final ConcurrentLinkedQueue<DispatcherThread> waitingDispatchers = new ConcurrentLinkedQueue<DispatcherThread>();
     final AtomicInteger waitingDispatcherCount = new AtomicInteger();
-
+    final AtomicInteger startCounter = new AtomicInteger();
     
     public SimpleDispatchSPI(int size) {
         globalQueues = new GlobalDispatchQueue[3];
         for (int i = 0; i < 3; i++) {
             globalQueues[i] = new GlobalDispatchQueue(this, DispatchQueuePriority.values()[i] );
         }
-                                  
         dispatchers = new DispatcherThread[size];
-        for (int i = 0; i < size; i++) {
-            dispatchers[i] = new DispatcherThread(this, i);
-            dispatchers[i].start();
-            
-        }
     }
     
     public DispatchQueue getMainQueue() {
@@ -98,5 +92,32 @@
         }
     }
 
+    public void start() {
+        if( startCounter.getAndIncrement()==0 ) {
+            for (int i = 0; i < dispatchers.length; i++) {
+                dispatchers[i] = new DispatcherThread(this, i);
+                dispatchers[i].start();
+            }
+        }
+    }
+
+    public void shutdown(final Runnable onShutdown) {
+        if( startCounter.decrementAndGet()==0 ) {
+            
+            final AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length);
+            for (int i = 0; i < dispatchers.length; i++) {
+                ThreadDispatchQueue queue = dispatchers[i].threadQueues[LOW.ordinal()];
+                queue.runnables.add(new Runnable() {
+                    public void run() {
+                        if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+                            onShutdown.run();
+                        }
+                        throw new DispatcherThread.Shutdown();
+                    }
+                });
+            }
+        }
+    }
+
     
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Fri Dec  4 19:48:21 2009
@@ -18,8 +18,7 @@
 
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.activemq.dispatch.DispatchSystem.DispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.AdancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSPI;
 
 import static java.lang.String.*;
@@ -30,15 +29,35 @@
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class DispatchSystemTest {
+
+    public static class RunnableCountDownLatch extends CountDownLatch implements Runnable {
+        public RunnableCountDownLatch(int count) {
+            super(count);
+        }
+        public void run() {
+            countDown();
+        }
+    }
     
     public static void main(String[] args) throws Exception {
-        DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
+        DispatchSPI advancedSystem = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), 3);
+        advancedSystem.start();
         benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
         benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
 
+        RunnableCountDownLatch latch = new RunnableCountDownLatch(1);
+        advancedSystem.shutdown(latch);
+        latch.await();
+
         DispatchSPI simpleSystem = new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
+        simpleSystem.start();
+        
         benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));
         benchmark("simple private serial queue", simpleSystem, simpleSystem.createQueue("test"));
+
+        latch = new RunnableCountDownLatch(1);
+        simpleSystem.shutdown(latch);
+        latch.await();
     }
 
     private static void benchmark(String name, DispatchSPI spi, DispatchQueue queue) throws InterruptedException {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Fri Dec  4 19:48:21 2009
@@ -28,7 +28,7 @@
 public class DispatcherPoolTest {
     
     public static void main(String[] args) throws Exception {
-        DispatcherPool pooledDispatcher = new DispatcherPool("default", Runtime.getRuntime().availableProcessors(), 3);
+        AdvancedDispatchSPI pooledDispatcher = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), 3);
         pooledDispatcher.start();
         
         // warm the JIT up..
@@ -46,7 +46,7 @@
         System.out.println(format("duration: %,.3f ms, rate: %,.2f executions/sec", durationMS, rate));
     }
 
-    private static void benchmarkWork(final DispatcherPool pooledDispatcher, int iterations) throws InterruptedException {
+    private static void benchmarkWork(final AdvancedDispatchSPI pooledDispatcher, int iterations) throws InterruptedException {
         final CountDownLatch counter = new CountDownLatch(iterations);
         for (int i = 0; i < 1000; i++) {
             Work dispatchable = new Work(counter, pooledDispatcher);
@@ -59,9 +59,9 @@
         private final CountDownLatch counter;
         private final DispatchContext context;
 
-        private Work(CountDownLatch counter, DispatcherPool pooledDispatcher) {
+        private Work(CountDownLatch counter, AdvancedDispatchSPI spi) {
             this.counter = counter;
-            this.context = pooledDispatcher.register(this , "test");
+            this.context = spi.register(this , "test");
         }
 
         public void run() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java Fri Dec  4 19:48:21 2009
@@ -22,7 +22,8 @@
 import javax.xml.bind.annotation.XmlRootElement;
 
 import org.apache.activemq.apollo.broker.Broker;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 
 @XmlRootElement(name="dispatcher")
@@ -36,12 +37,12 @@
 	@XmlAttribute(required=false)
 	int threads = Runtime.getRuntime().availableProcessors();
 	
-	public Dispatcher createDispatcher(BrokerXml brokerXml) {
+	public AdvancedDispatchSPI createDispatcher(BrokerXml brokerXml) {
 		if( name == null ) {
 //			VirtualHostXml vh = brokerXml.getDefaultVirtualHost();
 			name = "Broker: ";
 		}
-		return DispatcherThread.createPriorityDispatchPool(name, maxPriority, threads);
+		return new AdvancedDispatchSPI(threads, maxPriority);
 	}
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java Fri Dec  4 19:48:21 2009
@@ -26,7 +26,7 @@
 import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerFactory;
 import org.apache.activemq.broker.store.memory.MemoryStore;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherPool;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.junit.Test;
@@ -44,9 +44,9 @@
 		LOG.info("Loading broker configuration from the classpath with URI: " + uri);
 		Broker broker = BrokerFactory.createBroker(uri);
 		
-		DispatcherPool p = (DispatcherPool)broker.getDispatcher();
-		assertEquals(4, p.getSize());
-		assertEquals("test dispatcher", p.getName());
+		AdvancedDispatchSPI p = (AdvancedDispatchSPI)broker.getDispatcher();
+//		assertEquals(4, p.getSize());
+//		assertEquals("test dispatcher", p.getName());
 		
 		
 		assertEquals(1, broker.getTransportServers().size());

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Fri Dec  4 19:48:21 2009
@@ -39,9 +39,8 @@
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
@@ -65,7 +64,7 @@
 
     private static int PERFORMANCE_SAMPLES = 5;
 
-    Dispatcher dispatcher;
+    AdvancedDispatchSPI dispatcher;
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
@@ -82,12 +81,8 @@
     protected ArrayList<Producer> producers = new ArrayList<Producer>();
     protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
 
-    protected Dispatcher createDispatcher() {
-        if (THREAD_POOL_SIZE > 1) {
-            return DispatcherThread.createPriorityDispatchPool("TestDispatcher", Broker.MAX_PRIORITY, THREAD_POOL_SIZE);
-        } else {
-            return DispatcherThread.createPriorityDispatcher("TestDispatcher", Broker.MAX_PRIORITY);
-        }
+    protected AdvancedDispatchSPI createDispatcher() {
+        return new AdvancedDispatchSPI(THREAD_POOL_SIZE, Broker.MAX_PRIORITY);
     }
 
     protected int consumerStartDelay = 0;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Fri Dec  4 19:48:21 2009
@@ -20,7 +20,7 @@
 import java.util.Collection;
 
 import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 
 /**
@@ -31,7 +31,7 @@
  */
 public abstract class AbstractFlowQueue<E> extends AbstractFlowRelay<E> implements FlowControllable<E>, IFlowQueue<E> {
 
-    protected Dispatcher dispatcher;
+    protected AdvancedDispatchSPI dispatcher;
     protected DispatchContext dispatchContext;
     protected Collection<IPollableFlowSource.FlowReadyListener<E>> readyListeners;
     private boolean notifyReady = false;
@@ -132,7 +132,7 @@
      * @param dispatcher
      *            The dispatcher to handle messages.
      */
-    public synchronized void setDispatcher(Dispatcher dispatcher) {
+    public synchronized void setDispatcher(AdvancedDispatchSPI dispatcher) {
         this.dispatcher = dispatcher;
         dispatchContext = dispatcher.register(new Runnable(){
             public void run() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java Fri Dec  4 19:48:21 2009
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.queue;
 
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.flow.IFlowRelay;
 
 public interface IFlowQueue<E> extends IBlockingFlowSource<E>, IPollableFlowSource<E>, IFlowRelay<E> {
@@ -57,7 +57,7 @@
      * @param dispatcher
      *            The dispatcher to be used by the queue.
      */
-    public void setDispatcher(Dispatcher dispatcher);
+    public void setDispatcher(AdvancedDispatchSPI dispatcher);
 
     /**
      * Sets the base dispatch priority for the queue. Setting to higher value

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java Fri Dec  4 19:48:21 2009
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.queue;
 
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.queue.QueueStore.PersistentQueue;
 import org.apache.activemq.util.Mapper;
 
@@ -47,7 +47,7 @@
      * @param dispatcher
      *            The dispatcher to be used by the queue.
      */
-    public void setDispatcher(Dispatcher dispatcher);
+    public void setDispatcher(AdvancedDispatchSPI dispatcher);
 
     /**
      * Sets the base dispatch priority for the queue. Setting to higher value

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Fri Dec  4 19:48:21 2009
@@ -20,7 +20,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.buffer.AsciiBuffer;
@@ -30,7 +30,7 @@
     protected HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
     private HashMap<Integer, IQueue<K, V>> partitions = new HashMap<Integer, IQueue<K, V>>();
     protected QueueStore<K, V> store;
-    protected Dispatcher dispatcher;
+    protected AdvancedDispatchSPI dispatcher;
     protected boolean started;
     protected boolean shutdown = false;
     protected QueueDescriptor queueDescriptor;
@@ -239,7 +239,7 @@
         this.autoRelease = autoRelease;
     }
 
-    public void setDispatcher(Dispatcher dispatcher) {
+    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
         checkShutdown();
         this.dispatcher = dispatcher;
         synchronized (this) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java Fri Dec  4 19:48:21 2009
@@ -10,7 +10,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.flow.AbstractLimiter;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.IFlowLimiter;
@@ -65,12 +65,12 @@
     private final int inputWindowSize = 1000;
     private final int inputResumeThreshold = 500;
 
-    private Dispatcher dispatcher;
+    private AdvancedDispatchSPI dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     protected boolean blockingTransport = false;
     ExecutorService blockingWriter;
 
-    public static void setInShutdown(boolean val, Dispatcher dispatcher) {
+    public static void setInShutdown(boolean val, AdvancedDispatchSPI dispatcher) {
         if (val != inShutdown.getAndSet(val)) {
             if (val) {
                 if (USE_RATE_BASED_LIMITER) {
@@ -274,11 +274,11 @@
         this.priorityLevels = priorityLevels;
     }
 
-    public Dispatcher getDispatcher() {
+    public AdvancedDispatchSPI getDispatcher() {
         return dispatcher;
     }
 
-    public void setDispatcher(Dispatcher dispatcher) {
+    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
         this.dispatcher = dispatcher;
     }
 
@@ -455,12 +455,12 @@
 
     protected static class RateBasedLimiterCollector implements Runnable {
 
-        private Dispatcher dispatcher;
+        private AdvancedDispatchSPI dispatcher;
         private int samplingPeriod = 50;
         private boolean scheduled = false;
         private HashSet<RateBasedLimiter> limiters = new HashSet<RateBasedLimiter>();
 
-        public synchronized void setDispatcher(Dispatcher d) {
+        public synchronized void setDispatcher(AdvancedDispatchSPI d) {
             if (d != dispatcher) {
                 scheduled = false;
                 dispatcher = d;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java Fri Dec  4 19:48:21 2009
@@ -21,8 +21,9 @@
 import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherAware;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.Commands.Destination;
@@ -53,7 +54,7 @@
     private TransportServer transportServer;
     private String uri;
     private String name;
-    protected Dispatcher dispatcher;
+    protected AdvancedDispatchSPI dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
     private boolean useInputQueues = false;
 
@@ -159,7 +160,7 @@
         error.printStackTrace();
     }
 
-    public Dispatcher getDispatcher() {
+    public AdvancedDispatchSPI getDispatcher() {
         return dispatcher;
     }
 
@@ -167,7 +168,7 @@
         this.name = name;
     }
 
-    public void setDispatcher(Dispatcher dispatcher) {
+    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
         this.dispatcher = dispatcher;
     }
 
@@ -189,7 +190,7 @@
 
     protected void createDispatcher() {
         if (dispatcher == null) {
-            dispatcher = DispatcherThread.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+            dispatcher = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), Message.MAX_PRIORITY);
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java Fri Dec  4 19:48:21 2009
@@ -20,7 +20,8 @@
 
 import junit.framework.TestCase;
 
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.Destination.DestinationBean;
@@ -63,7 +64,7 @@
     protected MockBroker rcvBroker;
     protected MockClient client;
 
-    protected Dispatcher dispatcher;
+    protected AdvancedDispatchSPI dispatcher;
 
     static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>() {
         public Long map(Message element) {
@@ -94,8 +95,8 @@
         }
     }
 
-    protected Dispatcher createDispatcher(String name) {
-        return DispatcherThread.createPriorityDispatchPool(name, Message.MAX_PRIORITY, threadsPerDispatcher);
+    protected AdvancedDispatchSPI createDispatcher(String name) {
+        return new AdvancedDispatchSPI(threadsPerDispatcher, Message.MAX_PRIORITY);
     }
 
     public void test_1_1_0() throws Exception {
@@ -284,7 +285,7 @@
             }
         }
 
-        Dispatcher clientDispatcher = null;
+        AdvancedDispatchSPI clientDispatcher = null;
         if (SEPARATE_CLIENT_DISPATCHER) {
             clientDispatcher = createDispatcher("ClientDispatcher");
             clientDispatcher.start();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java Fri Dec  4 19:48:21 2009
@@ -7,7 +7,8 @@
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.Destination.DestinationBean;
@@ -43,7 +44,7 @@
     protected ArrayList<MetricCounter> additionalReportMetrics = new ArrayList<MetricCounter>();
     protected boolean includeDetailedRates = false;
 
-    protected Dispatcher dispatcher;
+    protected AdvancedDispatchSPI dispatcher;
 
     public RemoteConsumer consumer(int index) {
         return consumers.get(index);
@@ -214,7 +215,7 @@
         return testName;
     }
 
-    public void setDispatcher(Dispatcher dispatcher) {
+    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
         this.dispatcher = dispatcher;
     }
 
@@ -274,13 +275,13 @@
         }
     }
 
-    public Dispatcher getDispatcher() {
+    public AdvancedDispatchSPI getDispatcher() {
         return dispatcher;
     }
 
-    protected Dispatcher createDispatcher() {
+    protected AdvancedDispatchSPI createDispatcher() {
         if (dispatcher == null) {
-            dispatcher = DispatcherThread.createPriorityDispatchPool("ClientDispatcher", numPriorities, threadsPerDispatcher);
+            dispatcher = new AdvancedDispatchSPI(threadsPerDispatcher, numPriorities);
         }
         return dispatcher;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java Fri Dec  4 19:48:21 2009
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.transport;
 
-import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
+import org.apache.activemq.dispatch.DispatcherAware;
 
 public interface DispatchableTransport extends Transport, DispatcherAware {
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Fri Dec  4 19:48:21 2009
@@ -14,7 +14,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
@@ -75,7 +75,7 @@
             }
         }
 
-        public void setDispatcher(Dispatcher dispatcher) {
+        public void setDispatcher(AdvancedDispatchSPI dispatcher) {
             readContext = dispatcher.register(new Runnable() {
                 public void run() {
                     dispatch();



Mime
View raw message