activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r887686 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/ activ...
Date Sun, 06 Dec 2009 14:32:40 GMT
Author: chirino
Date: Sun Dec  6 14:32:37 2009
New Revision: 887686

URL: http://svn.apache.org/viewvc?rev=887686&view=rev
Log:
Refactor for better dispatch package class names

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatch.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchAware.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatchObserver.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteConsumer.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java Sun Dec  6 14:32:37 2009
@@ -25,8 +25,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
@@ -43,7 +42,7 @@
     protected int inputResumeThreshold = 512 * 1024;
     protected boolean useAsyncWriteThread = true;
 
-    private AdvancedDispatchSPI dispatcher;
+    private Dispatch dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
     private ExecutorService blockingWriter;
     private ExceptionListener exceptionListener;
@@ -171,11 +170,11 @@
         this.priorityLevels = priorityLevels;
     }
 
-    public AdvancedDispatchSPI getDispatcher() {
+    public Dispatch getDispatcher() {
         return dispatcher;
     }
 
-    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
+    public void setDispatcher(Dispatch dispatcher) {
         this.dispatcher = dispatcher;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Sun Dec  6 14:32:37 2009
@@ -25,11 +25,9 @@
 
 import org.apache.activemq.Service;
 import org.apache.activemq.apollo.Connection;
-import org.apache.activemq.dispatch.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+import org.apache.activemq.dispatch.Dispatch;
+import org.apache.activemq.dispatch.DispatchFactory;
+import org.apache.activemq.dispatch.DispatchAware;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportServer;
@@ -53,7 +51,7 @@
 
     private final LinkedHashMap<AsciiBuffer, VirtualHost> virtualHosts = new LinkedHashMap<AsciiBuffer, VirtualHost>();
     private VirtualHost defaultVirtualHost;
-    private AdvancedDispatchSPI dispatcher;
+    private Dispatch dispatcher;
     private File dataDirectory;
 
     private final class BrokerAcceptListener implements TransportAcceptListener {
@@ -131,7 +129,7 @@
 		// apply some default configuration to this broker instance before it's started.
 		if( dispatcher == null ) {
 			int threads = Runtime.getRuntime().availableProcessors();
-			dispatcher = new AdvancedDispatchSPI(threads, Broker.MAX_PRIORITY);
+			dispatcher = DispatchFactory.create(getName(), threads);
 		}
 		
 
@@ -186,9 +184,9 @@
         for (VirtualHost virtualHost : virtualHosts.values()) {
         	stop(virtualHost);
         }
-        dispatcher.shutdown();
+        
+        dispatcher.release();
     	state.set(State.STOPPED);
-
     }
         
     // /////////////////////////////////////////////////////////////////
@@ -378,10 +376,10 @@
     // /////////////////////////////////////////////////////////////////
     // Property Accessors
     // /////////////////////////////////////////////////////////////////
-    public AdvancedDispatchSPI getDispatcher() {
+    public Dispatch getDispatcher() {
         return dispatcher;
     }
-    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
+    public void setDispatcher(Dispatch dispatcher) {
     	assertInConfigurationState();
         this.dispatcher = dispatcher;
     }
@@ -419,8 +417,8 @@
     
 	private void startTransportServer(TransportServer server) throws Exception {
 		server.setAcceptListener(new BrokerAcceptListener());
-		if (server instanceof DispatcherAware ) {
-			((DispatcherAware) server).setDispatcher(dispatcher);
+		if (server instanceof DispatchAware ) {
+			((DispatchAware) server).setDispatcher(dispatcher);
 		}
 		server.start();
 	}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Sun Dec  6 14:32:37 2009
@@ -40,8 +40,8 @@
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.dispatch.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.Dispatch;
+import org.apache.activemq.dispatch.DispatchAware;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
@@ -60,7 +60,7 @@
 import org.apache.activemq.util.list.LinkedNode;
 import org.apache.activemq.util.list.LinkedNodeList;
 
-public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase<?>> implements Service, DispatcherAware {
+public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase<?>> implements Service, DispatchAware {
 
     private static final boolean DEBUG = false;
 
@@ -71,7 +71,7 @@
     private final FlowController<OperationBase<?>> storeController;
     private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
 
-    private AdvancedDispatchSPI dispatcher;
+    private Dispatch dispatcher;
     private Thread flushThread;
     private AtomicBoolean running = new AtomicBoolean(false);
     private DatabaseListener listener;
@@ -328,7 +328,7 @@
 
         if (requestedDelayedFlushPointer == -1) {
             requestedDelayedFlushPointer = delayedFlushPointer;
-            dispatcher.schedule(flushDelayCallback, flushDelay, TimeUnit.MILLISECONDS);
+            dispatcher.getGlobalQueue().dispatchAfter(flushDelayCallback, flushDelay, TimeUnit.MILLISECONDS);
         }
 
     }
@@ -1288,11 +1288,11 @@
         return store.allocateStoreTracking();
     }
 
-    public AdvancedDispatchSPI getDispatcher() {
+    public Dispatch getDispatcher() {
         return dispatcher;
     }
 
-    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
+    public void setDispatcher(Dispatch dispatcher) {
         this.dispatcher = dispatcher;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Sun Dec  6 14:32:37 2009
@@ -24,7 +24,7 @@
 
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
 import org.apache.activemq.flow.SizeLimiter;
@@ -52,7 +52,7 @@
     private static final boolean USE_PRIORITY_QUEUES = true;
 
     private BrokerDatabase database;
-    private AdvancedDispatchSPI dispatcher;
+    private Dispatch dispatcher;
 
     private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
     private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
@@ -226,7 +226,7 @@
         this.database = database;
     }
 
-    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
+    public void setDispatcher(Dispatch dispatcher) {
         this.dispatcher = dispatcher;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Sun Dec  6 14:32:37 2009
@@ -30,9 +30,8 @@
 import org.apache.activemq.apollo.broker.Router;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+import org.apache.activemq.dispatch.Dispatch;
+import org.apache.activemq.dispatch.DispatchFactory;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.transport.TransportFactory;
@@ -88,7 +87,7 @@
     protected Broker sendBroker;
     protected Broker rcvBroker;
     protected ArrayList<Broker> brokers = new ArrayList<Broker>();
-    protected AdvancedDispatchSPI dispatcher;
+    protected Dispatch dispatcher;
     protected final AtomicLong msgIdGenerator = new AtomicLong();
     protected final AtomicBoolean stopping = new AtomicBoolean();
 
@@ -135,8 +134,8 @@
 
     protected abstract String getRemoteWireFormat();
 
-    protected AdvancedDispatchSPI createDispatcher() {
-        return new AdvancedDispatchSPI(asyncThreadPoolSize, Broker.MAX_PRIORITY);
+    protected Dispatch createDispatcher() {
+        return DispatchFactory.create("test", asyncThreadPoolSize);
     }
 
     @Test
@@ -539,7 +538,7 @@
             connection.stop();
         }
         if (dispatcher != null) {
-            dispatcher.shutdown();
+            dispatcher.release();
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java Sun Dec  6 14:32:37 2009
@@ -46,7 +46,7 @@
     protected void messageReceived(final ISourceController<MessageDelivery> controller, final MessageDelivery elem) {
         if( schedualWait ) {
             if (thinkTime > 0) {
-                getDispatcher().schedule(new Runnable(){
+                getDispatcher().getGlobalQueue().dispatchAfter(new Runnable(){
                     public void run() {
                         consumerRate.increment();
                         controller.elementDispatched(elem);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java Sun Dec  6 14:32:37 2009
@@ -59,7 +59,7 @@
         
         setupProducer();
         
-        dispatchQueue = getDispatcher().createQueue(name + "-client");
+        dispatchQueue = getDispatcher().createSerialQueue(name + "-client");
         dispatchTask = new Runnable(){
             public void run() {
                 dispatch();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Sun Dec  6 14:32:37 2009
@@ -21,15 +21,13 @@
 
 import junit.framework.TestCase;
 
-import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerDatabase;
 import org.apache.activemq.apollo.broker.BrokerQueueStore;
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+import org.apache.activemq.dispatch.Dispatch;
+import org.apache.activemq.dispatch.DispatchFactory;
 import org.apache.activemq.queue.IQueue;
 
 /**
@@ -39,7 +37,7 @@
 public class SharedQueueTest extends TestCase {
 
 
-    AdvancedDispatchSPI dispatcher;
+    Dispatch dispatcher;
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
@@ -48,8 +46,8 @@
 
     protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
 
-    protected AdvancedDispatchSPI createDispatcher() {
-        return new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), Broker.MAX_PRIORITY);
+    protected Dispatch createDispatcher() {
+        return DispatchFactory.create("test", Runtime.getRuntime().availableProcessors());
     }
 
     protected int consumerStartDelay = 0;
@@ -77,9 +75,8 @@
     }
 
     protected void stopServices() throws Exception {
-        dispatcher.shutdown();
         database.stop();
-        dispatcher.shutdown();
+        dispatcher.release();
         queues.clear();
     }
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatch.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatch.java?rev=887686&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatch.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatch.java Sun Dec  6 14:32:37 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch;
+
+import java.nio.channels.SelectableChannel;
+
+
+public interface Dispatch extends Retained {
+    
+    public void start();
+    public void shutdown(Runnable onShutdown);
+    
+    public DispatchQueue getGlobalQueue();
+    public DispatchQueue getGlobalQueue(DispatchPriority priority);
+    
+    public DispatchQueue createSerialQueue(String label);
+    
+    public DispatchQueue getMainQueue();
+    public void dispatchMain();
+    
+    public DispatchQueue getCurrentQueue();
+
+    public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue);
+    
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchAware.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchAware.java?rev=887686&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchAware.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchAware.java Sun Dec  6 14:32:37 2009
@@ -0,0 +1,14 @@
+package org.apache.activemq.dispatch;
+
+
+/**
+ * Handy interface to signal classes which would like an DispatchSPI instance
+ * injected into them.
+ *  
+ * @author chirino
+ */
+public interface DispatchAware {
+
+	public void setDispatcher(Dispatch dispatcher);
+	
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchFactory.java?rev=887686&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchFactory.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchFactory.java Sun Dec  6 14:32:37 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSPI;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DispatchFactory {
+
+    public static Dispatch create() {
+        return create("system", Runtime.getRuntime().availableProcessors());
+    }
+
+    public static Dispatch create(String name, int threads) {
+        return new SimpleDispatchSPI(name, threads);
+    }
+    
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java?rev=887686&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java Sun Dec  6 14:32:37 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch;
+
+import org.apache.activemq.dispatch.internal.simple.DispatcherThread;
+
+public interface DispatchObserver {
+    
+    public void onThreadCreate(DispatcherThread thread);
+    public void onThreadDestroy(DispatcherThread thread);
+
+    public void onQueueCreate(DispatchQueue queue);
+    public void onQueueDestroy(DispatchQueue queue);
+    
+    public void onSourceCreate(DispatchSource source);
+    public void onSourceDestroy(DispatchSource source);
+    
+    public void onDispatchRequest(DispatchQueue target, Runnable request);
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Sun Dec  6 14:32:37 2009
@@ -26,50 +26,34 @@
  */
 public class DispatchSystem {
 
-    public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
-    static public DispatchQueue getCurrentQueue() {
-        return CURRENT_QUEUE.get();
-    }
-
-    private static DispatchSPI spi;
+    final private static Dispatch spi = create();
 
-    public static DispatchSPI create() {
-        return create("system", Runtime.getRuntime().availableProcessors());
+    private static Dispatch create() {
+        return new SimpleDispatchSPI("system", Runtime.getRuntime().availableProcessors());
     }
 
-    public static SimpleDispatchSPI create(String name, int threads) {
-        return new SimpleDispatchSPI(name, threads);
-    }
-    
-    synchronized public static DispatchSPI spi() {
-        if(spi==null) {
-            spi = create();
-        }
-        return spi;
-    }
-    
     static DispatchQueue getMainQueue() {
-        return spi().getMainQueue();
+        return spi.getMainQueue();
     }
     
     static public DispatchQueue getGlobalQueue() {
-        return spi().getGlobalQueue();
+        return spi.getGlobalQueue();
     }
     
     static public DispatchQueue getGlobalQueue(DispatchPriority priority) {
-        return spi().getGlobalQueue(priority);
+        return spi.getGlobalQueue(priority);
     }
     
-    static DispatchQueue createQueue(String label) {
-        return spi().createQueue(label);
+    static DispatchQueue getSerialQueue(String label) {
+        return spi.createSerialQueue(label);
     }
     
     static void dispatchMain() {
-        spi().dispatchMain();
+        spi.dispatchMain();
     }
 
     static DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
-        return spi().createSource(channel, interestOps, queue);
+        return spi.createSource(channel, interestOps, queue);
     }
 
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java Sun Dec  6 14:32:37 2009
@@ -22,7 +22,12 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.DispatchSystem;
+import org.apache.activemq.dispatch.Dispatch;
+import org.apache.activemq.dispatch.DispatchFactory;
+
+import static org.apache.activemq.dispatch.DispatchFactory.*;
+
+import static org.apache.activemq.dispatch.DispatchFactory.*;
 
 /**
  * 
@@ -84,8 +89,8 @@
     }
 
     public void run() {
-        DispatchQueue original = DispatchSystem.CURRENT_QUEUE.get();
-        DispatchSystem.CURRENT_QUEUE.set(this);
+        DispatchQueue original = CURRENT_QUEUE.get();
+        CURRENT_QUEUE.set(this);
         try {
             Runnable runnable;
             long lsize = size.get();
@@ -104,7 +109,7 @@
                 }
             }
         } finally {
-            DispatchSystem.CURRENT_QUEUE.set(original);
+            CURRENT_QUEUE.set(original);
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java Sun Dec  6 14:32:37 2009
@@ -24,19 +24,19 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.dispatch.DispatchPriority;
-import org.apache.activemq.dispatch.DispatchSPI;
+import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSource;
 import org.apache.activemq.dispatch.internal.BaseRetained;
 import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
 
 import static org.apache.activemq.dispatch.DispatchPriority.*;
 
-import static org.apache.activemq.dispatch.DispatchPriority.*;
 
+public class AdvancedDispatchSPI extends BaseRetained implements Dispatch {
 
-public class AdvancedDispatchSPI extends BaseRetained implements DispatchSPI {
+    public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
 
     final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
     final GlobalDispatchQueue globalQueues[];
@@ -210,7 +210,7 @@
         return globalQueues[priority.ordinal()];
     }
     
-    public DispatchQueue createQueue(String label) {
+    public DispatchQueue createSerialQueue(String label) {
         AdvancedSerialDispatchQueue rc = new AdvancedSerialDispatchQueue(label);
         rc.setTargetQueue(getGlobalQueue());
         return rc;
@@ -222,6 +222,10 @@
 
     public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
         return null;
+    }
+
+    public DispatchQueue getCurrentQueue() {
+        return CURRENT_QUEUE.get();
     }    
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Sun Dec  6 14:32:37 2009
@@ -25,12 +25,13 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.dispatch.DispatchPriority;
-import org.apache.activemq.dispatch.DispatchSystem;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.PriorityLinkedList;
 import org.apache.activemq.util.TimerHeap;
 import org.apache.activemq.util.list.LinkedNodeList;
 
+import static org.apache.activemq.dispatch.DispatchFactory.*;
+
 public class DispatcherThread implements Runnable {
 
     static public final ThreadLocal<DispatcherThread> CURRENT = new ThreadLocal<DispatcherThread>();
@@ -183,7 +184,7 @@
                 // If no local work available wait for foreign work:
                 while((pdc = priorityQueue.poll())!=null){
                     if( pdc.priority < dispatchQueues.length ) {
-                        DispatchSystem.CURRENT_QUEUE.set(dispatchQueues[pdc.priority]);
+                        AdvancedDispatchSPI.CURRENT_QUEUE.set(dispatchQueues[pdc.priority]);
                     }
                     
                     if (pdc.tracker != null) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Sun Dec  6 14:32:37 2009
@@ -20,7 +20,6 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.DispatchPriority;
-import org.apache.activemq.dispatch.DispatchSystem;
 
 /**
  * 
@@ -48,7 +47,7 @@
             outer: while( true ) {
                 int counter=0;
                 for (SimpleQueue queue : threadQueues) {
-                    DispatchSystem.CURRENT_QUEUE.set(queue);
+                    SimpleDispatchSPI.CURRENT_QUEUE.set(queue);
                     Runnable runnable;
                     while( (runnable = queue.poll())!=null ) {
                         dispatch(runnable);
@@ -62,7 +61,7 @@
                 }
                 
                 for (SimpleQueue queue : spi.globalQueues) {
-                    DispatchSystem.CURRENT_QUEUE.set(threadQueues[queue.getPriority().ordinal()]);
+                    SimpleDispatchSPI.CURRENT_QUEUE.set(threadQueues[queue.getPriority().ordinal()]);
                     
                     Runnable runnable;
                     while( (runnable = queue.poll())!=null ) {
@@ -118,7 +117,7 @@
                 break;
             }        
             if( counter==0 ) {
-                DispatchSystem.CURRENT_QUEUE.set(queue);
+                SimpleDispatchSPI.CURRENT_QUEUE.set(queue);
             }
             dispatch(runnable);
             counter++;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java Sun Dec  6 14:32:37 2009
@@ -24,7 +24,7 @@
 
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchPriority;
-import org.apache.activemq.dispatch.DispatchSPI;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.dispatch.DispatchSource;
 import org.apache.activemq.dispatch.internal.BaseRetained;
 import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
@@ -38,8 +38,10 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class SimpleDispatchSPI extends BaseRetained implements DispatchSPI {
+public class SimpleDispatchSPI extends BaseRetained implements Dispatch {
         
+    public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
+
     final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
     final GlobalDispatchQueue globalQueues[]; 
     final DispatcherThread dispatchers[];
@@ -72,7 +74,7 @@
         return globalQueues[priority.ordinal()];
     }
     
-    public DispatchQueue createQueue(String label) {
+    public DispatchQueue createSerialQueue(String label) {
         SerialDispatchQueue rc = new SerialDispatchQueue(label) {
             @Override
             public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
@@ -142,5 +144,9 @@
     public String getLabel() {
         return label;
     }
+
+    public DispatchQueue getCurrentQueue() {
+        return CURRENT_QUEUE.get();
+    }
     
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java Sun Dec  6 14:32:37 2009
@@ -13,7 +13,7 @@
 import junit.framework.TestCase;
 
 import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.DispatchSPI;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 
 
@@ -45,10 +45,10 @@
     
     public void testActorInvocation() throws Exception
     {
-        DispatchSPI advancedSystem = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), 3);
+        Dispatch advancedSystem = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), 3);
         advancedSystem.start();
         
-        DispatchQueue queue = advancedSystem.createQueue("test");
+        DispatchQueue queue = advancedSystem.createSerialQueue("test");
         ActorTestObject testObject = Actor.create(new ActorTestObject(), queue);
         
         CountDownLatch latch = new CountDownLatch(1);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Sun Dec  6 14:32:37 2009
@@ -33,27 +33,27 @@
 public class DispatchSystemTest {
 
     public static void main(String[] args) throws Exception {
-        DispatchSPI advancedSystem = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), 3);
+        Dispatch advancedSystem = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), 3);
         advancedSystem.start();
         benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
-        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
+        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createSerialQueue("test"));
 
         RunnableCountDownLatch latch = new RunnableCountDownLatch(1);
         advancedSystem.shutdown(latch);
         latch.await();
 
-        DispatchSPI simpleSystem = new SimpleDispatchSPI("test", Runtime.getRuntime().availableProcessors());
+        Dispatch simpleSystem = new SimpleDispatchSPI("test", Runtime.getRuntime().availableProcessors());
         simpleSystem.start();
         
         benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));
-        benchmark("simple private serial queue", simpleSystem, simpleSystem.createQueue("test"));
+        benchmark("simple private serial queue", simpleSystem, simpleSystem.createSerialQueue("test"));
 
         latch = new RunnableCountDownLatch(1);
         simpleSystem.shutdown(latch);
         latch.await();
     }
 
-    private static void benchmark(String name, DispatchSPI spi, DispatchQueue queue) throws InterruptedException {
+    private static void benchmark(String name, Dispatch spi, DispatchQueue queue) throws InterruptedException {
         // warm the JIT up..
         benchmarkWork(spi, queue, 100000);
         
@@ -68,13 +68,13 @@
         System.out.println(format("name: %s, duration: %,.3f ms, rate: %,.2f executions/sec", name, durationMS, rate));
     }
 
-    private static void benchmarkWork(final DispatchSPI spi, final DispatchQueue queue, int iterations) throws InterruptedException {
+    private static void benchmarkWork(final Dispatch spi, final DispatchQueue queue, int iterations) throws InterruptedException {
         final CountDownLatch counter = new CountDownLatch(iterations);
         Runnable task = new Runnable(){
             public void run() {
                 counter.countDown();
                 if( counter.getCount()>0 ) {
-                    DispatchSystem.getCurrentQueue().dispatchAsync(this);
+                    spi.getCurrentQueue().dispatchAsync(this);
                 }
             }
         };

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Sun Dec  6 14:32:37 2009
@@ -63,7 +63,7 @@
 
         private Work(CountDownLatch counter, AdvancedDispatchSPI spi) {
             this.counter = counter;
-            dispatchQueue = spi.createQueue("test");
+            dispatchQueue = spi.createSerialQueue("test");
         }
 
         public void run() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java Sun Dec  6 14:32:37 2009
@@ -22,9 +22,8 @@
 import javax.xml.bind.annotation.XmlRootElement;
 
 import org.apache.activemq.apollo.broker.Broker;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+import org.apache.activemq.dispatch.Dispatch;
+import org.apache.activemq.dispatch.DispatchFactory;
 
 @XmlRootElement(name="dispatcher")
 @XmlAccessorType(XmlAccessType.FIELD)
@@ -37,12 +36,11 @@
 	@XmlAttribute(required=false)
 	int threads = Runtime.getRuntime().availableProcessors();
 	
-	public AdvancedDispatchSPI createDispatcher(BrokerXml brokerXml) {
+	public Dispatch createDispatcher(BrokerXml brokerXml) {
 		if( name == null ) {
-//			VirtualHostXml vh = brokerXml.getDefaultVirtualHost();
-			name = "Broker: ";
+			name = "broker";
 		}
-		return new AdvancedDispatchSPI(threads, maxPriority);
+		return DispatchFactory.create(name, threads);
 	}
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java Sun Dec  6 14:32:37 2009
@@ -26,7 +26,7 @@
 import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerFactory;
 import org.apache.activemq.broker.store.memory.MemoryStore;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.junit.Test;
@@ -44,7 +44,7 @@
 		LOG.info("Loading broker configuration from the classpath with URI: " + uri);
 		Broker broker = BrokerFactory.createBroker(uri);
 		
-		AdvancedDispatchSPI p = (AdvancedDispatchSPI)broker.getDispatcher();
+		Dispatch p = (Dispatch)broker.getDispatcher();
 //		assertEquals(4, p.getSize());
 //		assertEquals("test dispatcher", p.getName());
 		

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Sun Dec  6 14:32:37 2009
@@ -75,6 +75,7 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.LogicExpression;
@@ -693,7 +694,7 @@
             }
             controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
             controller.useOverFlowQueue(false);
-            controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities() - 1));
+            controller.setExecutor(connection.getDispatcher().getGlobalQueue(DispatchPriority.HIGH));
             super.onFlowOpened(controller);
         }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Sun Dec  6 14:32:37 2009
@@ -26,7 +26,6 @@
 
 import junit.framework.TestCase;
 
-import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerDatabase;
 import org.apache.activemq.apollo.broker.BrokerQueueStore;
 import org.apache.activemq.apollo.broker.MessageDelivery;
@@ -39,8 +38,10 @@
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.dispatch.Dispatch;
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.DispatchFactory;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
@@ -64,7 +65,7 @@
 
     private static int PERFORMANCE_SAMPLES = 5;
 
-    AdvancedDispatchSPI dispatcher;
+    Dispatch dispatcher;
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
@@ -81,8 +82,8 @@
     protected ArrayList<Producer> producers = new ArrayList<Producer>();
     protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
 
-    protected AdvancedDispatchSPI createDispatcher() {
-        return new AdvancedDispatchSPI(THREAD_POOL_SIZE, Broker.MAX_PRIORITY);
+    protected Dispatch createDispatcher() {
+        return DispatchFactory.create("pref-test", THREAD_POOL_SIZE);
     }
 
     protected int consumerStartDelay = 0;
@@ -104,9 +105,8 @@
     }
 
     protected void stopServices() throws Exception {
-        dispatcher.shutdown();
         database.stop();
-        dispatcher.shutdown();
+        dispatcher.release();
         consumers.clear();
         producers.clear();
         queues.clear();
@@ -216,7 +216,7 @@
             };
 
             if (consumerStartDelay > 0) {
-                dispatcher.schedule(startConsumers, consumerStartDelay, TimeUnit.SECONDS);
+                dispatcher.getGlobalQueue().dispatchAfter(startConsumers, consumerStartDelay, TimeUnit.SECONDS);
             } else {
                 startConsumers.run();
             }
@@ -308,7 +308,7 @@
             sendRate.name("Producer " + name + " Rate");
             totalProducerRate.add(sendRate);
             
-            dispatchQueue = dispatcher.createQueue(name);
+            dispatchQueue = dispatcher.createSerialQueue(name);
             dispatchTask = new Runnable(){
                 public void run() {
                     dispatch();
@@ -332,7 +332,7 @@
 
             Flow flow = new Flow(name, true);
             outboundQueue = new SingleFlowRelay<OpenWireMessageDelivery>(flow, name, limiter);
-            outboundQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+            outboundQueue.setFlowExecutor(dispatcher.getGlobalQueue(DispatchPriority.HIGH));
             outboundQueue.setDrain(new QueueDispatchTarget<OpenWireMessageDelivery>() {
 
                 public void drain(OpenWireMessageDelivery elem, ISourceController<OpenWireMessageDelivery> controller) {
@@ -449,7 +449,7 @@
 
             controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
             controller.useOverFlowQueue(false);
-            controller.setExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+            controller.setExecutor(dispatcher.getGlobalQueue(DispatchPriority.HIGH));
 
             rate.name("Consumer " + name + " Rate");
             totalConsumerRate.add(rate);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java Sun Dec  6 14:32:37 2009
@@ -22,6 +22,7 @@
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowResource;
@@ -70,7 +71,7 @@
                 return null;
             }
         }, flow, limiter, inboundMutex);
-        inboundController.setExecutor(getDispatcher().createPriorityExecutor(getDispatcher().getDispatchPriorities() - 1));
+        inboundController.setExecutor(getDispatcher().getGlobalQueue(DispatchPriority.HIGH));
 
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Sun Dec  6 14:32:37 2009
@@ -20,7 +20,8 @@
 import java.util.Collection;
 
 import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 
 /**
@@ -31,7 +32,7 @@
  */
 public abstract class AbstractFlowQueue<E> extends AbstractFlowRelay<E> implements FlowControllable<E>, IFlowQueue<E> {
 
-    protected AdvancedDispatchSPI dispatcher;
+    protected Dispatch dispatcher;
     protected Collection<IPollableFlowSource.FlowReadyListener<E>> readyListeners;
     private boolean notifyReady = false;
     protected int dispatchPriority = 0;
@@ -87,7 +88,7 @@
         }
         
         stop();
-        dispatchQueue.setFinalizer(onShutdown);
+        dispatchQueue.setShutdownHandler(onShutdown);
         dispatchQueue.release();
         dispatchQueue = null;
     }
@@ -131,10 +132,10 @@
      * @param dispatcher
      *            The dispatcher to handle messages.
      */
-    public synchronized void setDispatcher(AdvancedDispatchSPI dispatcher) {
+    public synchronized void setDispatcher(Dispatch dispatcher) {
         this.dispatcher = dispatcher;
         
-        dispatchQueue = dispatcher.createQueue(getResourceName());
+        dispatchQueue = dispatcher.createSerialQueue(getResourceName());
         dispatchTask = new Runnable(){
             public void run() {
                 if( pollingDispatch() ) {
@@ -146,7 +147,7 @@
 //        TODO:
 //        dispatchContext.updatePriority(dispatchPriority);
         
-        super.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+        super.setFlowExecutor(dispatcher.getGlobalQueue(DispatchPriority.HIGH));
     }
 
     public synchronized void setDispatchPriority(int priority) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Sun Dec  6 14:32:37 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.queue;
 
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowResource;
@@ -170,7 +171,7 @@
                 }
             };
             controller.useOverFlowQueue(false);
-            controller.setExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+            controller.setExecutor(dispatcher.getGlobalQueue(DispatchPriority.HIGH));
         }
 
         cursor = queue.openCursor(getResourceName(), memoryController, true, true);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java Sun Dec  6 14:32:37 2009
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.queue;
 
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.flow.IFlowRelay;
 
 public interface IFlowQueue<E> extends IBlockingFlowSource<E>, IPollableFlowSource<E>, IFlowRelay<E> {
@@ -57,7 +57,7 @@
      * @param dispatcher
      *            The dispatcher to be used by the queue.
      */
-    public void setDispatcher(AdvancedDispatchSPI dispatcher);
+    public void setDispatcher(Dispatch dispatcher);
 
     /**
      * Sets the base dispatch priority for the queue. Setting to higher value

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java Sun Dec  6 14:32:37 2009
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.queue;
 
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.queue.QueueStore.PersistentQueue;
 import org.apache.activemq.util.Mapper;
 
@@ -47,7 +47,7 @@
      * @param dispatcher
      *            The dispatcher to be used by the queue.
      */
-    public void setDispatcher(AdvancedDispatchSPI dispatcher);
+    public void setDispatcher(Dispatch dispatcher);
 
     /**
      * Sets the base dispatch priority for the queue. Setting to higher value

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Sun Dec  6 14:32:37 2009
@@ -21,7 +21,7 @@
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.buffer.AsciiBuffer;
@@ -31,7 +31,7 @@
     protected HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
     private HashMap<Integer, IQueue<K, V>> partitions = new HashMap<Integer, IQueue<K, V>>();
     protected QueueStore<K, V> store;
-    protected AdvancedDispatchSPI dispatcher;
+    protected Dispatch dispatcher;
     protected boolean started;
     protected boolean shutdown = false;
     protected QueueDescriptor queueDescriptor;
@@ -253,7 +253,7 @@
         this.autoRelease = autoRelease;
     }
 
-    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
+    public void setDispatcher(Dispatch dispatcher) {
         checkShutdown();
         this.dispatcher = dispatcher;
         synchronized (this) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java Sun Dec  6 14:32:37 2009
@@ -18,6 +18,7 @@
 
 import java.util.HashMap;
 
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
@@ -273,7 +274,7 @@
                 }
             };
             controller.useOverFlowQueue(false);
-            controller.setExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+            controller.setExecutor(dispatcher.getGlobalQueue(DispatchPriority.HIGH));
         }
 
         return queue.openCursor(name, controller, pageInElements, skipAcquired);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java Sun Dec  6 14:32:37 2009
@@ -10,7 +10,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.flow.AbstractLimiter;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.IFlowLimiter;
@@ -65,12 +66,12 @@
     private final int inputWindowSize = 1000;
     private final int inputResumeThreshold = 500;
 
-    private AdvancedDispatchSPI dispatcher;
+    private Dispatch dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     protected boolean blockingTransport = false;
     ExecutorService blockingWriter;
 
-    public static void setInShutdown(boolean val, AdvancedDispatchSPI dispatcher) {
+    public static void setInShutdown(boolean val, Dispatch dispatcher) {
         if (val != inShutdown.getAndSet(val)) {
             if (val) {
                 if (USE_RATE_BASED_LIMITER) {
@@ -104,7 +105,7 @@
                 dt.setName(name + "-transport");
             }
             dt.setDispatcher(getDispatcher());
-            dt.setDispatchPriority(dispatcher.getDispatchPriorities() - 1);
+            dt.setDispatchPriority(DispatchPriority.HIGH);
         }
         transport.start();
     }
@@ -153,7 +154,7 @@
             queue.getFlowController(flow).useOverFlowQueue(false);
             inputQueue = queue;
         }
-        inputQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+        inputQueue.setFlowExecutor(dispatcher.getGlobalQueue(DispatchPriority.HIGH));
         inputQueue.setDrain(new QueueDispatchTarget<Message>() {
 
             public void drain(Message message, ISourceController<Message> controller) {
@@ -191,7 +192,7 @@
             queue.addFlowReadyListener(asyncCommandQueue);
         }
         // Set the executor to be used by the queue's flow controllers:
-        outputQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+        outputQueue.setFlowExecutor(dispatcher.getGlobalQueue(DispatchPriority.HIGH));
 
         limiter.start();
         outboundLimiter.start();
@@ -274,11 +275,11 @@
         this.priorityLevels = priorityLevels;
     }
 
-    public AdvancedDispatchSPI getDispatcher() {
+    public Dispatch getDispatcher() {
         return dispatcher;
     }
 
-    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
+    public void setDispatcher(Dispatch dispatcher) {
         this.dispatcher = dispatcher;
     }
 
@@ -455,12 +456,12 @@
 
     protected static class RateBasedLimiterCollector implements Runnable {
 
-        private AdvancedDispatchSPI dispatcher;
+        private Dispatch dispatcher;
         private int samplingPeriod = 50;
         private boolean scheduled = false;
         private HashSet<RateBasedLimiter> limiters = new HashSet<RateBasedLimiter>();
 
-        public synchronized void setDispatcher(AdvancedDispatchSPI d) {
+        public synchronized void setDispatcher(Dispatch d) {
             if (d != dispatcher) {
                 scheduled = false;
                 dispatcher = d;
@@ -517,7 +518,7 @@
                 }
                 if (!scheduled && !limiters.isEmpty()) {
                     scheduled = true;
-                    dispatcher.schedule(this, samplingPeriod, TimeUnit.MILLISECONDS);
+                    dispatcher.getGlobalQueue().dispatchAfter(this, samplingPeriod, TimeUnit.MILLISECONDS);
                 }
             }
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java Sun Dec  6 14:32:37 2009
@@ -21,10 +21,9 @@
 import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.dispatch.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+import org.apache.activemq.dispatch.Dispatch;
+import org.apache.activemq.dispatch.DispatchFactory;
+import org.apache.activemq.dispatch.DispatchAware;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.transport.Transport;
@@ -54,7 +53,7 @@
     private TransportServer transportServer;
     private String uri;
     private String name;
-    protected AdvancedDispatchSPI dispatcher;
+    protected Dispatch dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
     private boolean useInputQueues = false;
 
@@ -126,8 +125,8 @@
 
         transportServer = TransportFactory.bind(new URI(uri));
         transportServer.setAcceptListener(this);
-        if (transportServer instanceof DispatcherAware) {
-            ((DispatcherAware) transportServer).setDispatcher(dispatcher);
+        if (transportServer instanceof DispatchAware) {
+            ((DispatchAware) transportServer).setDispatcher(dispatcher);
         }
         transportServer.start();
 
@@ -160,7 +159,7 @@
         error.printStackTrace();
     }
 
-    public AdvancedDispatchSPI getDispatcher() {
+    public Dispatch getDispatcher() {
         return dispatcher;
     }
 
@@ -168,7 +167,7 @@
         this.name = name;
     }
 
-    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
+    public void setDispatcher(Dispatch dispatcher) {
         this.dispatcher = dispatcher;
     }
 
@@ -190,7 +189,7 @@
 
     protected void createDispatcher() {
         if (dispatcher == null) {
-            dispatcher = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), Message.MAX_PRIORITY);
+            dispatcher = DispatchFactory.create("mock-broker", Runtime.getRuntime().availableProcessors());
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java Sun Dec  6 14:32:37 2009
@@ -20,9 +20,8 @@
 
 import junit.framework.TestCase;
 
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+import org.apache.activemq.dispatch.Dispatch;
+import org.apache.activemq.dispatch.DispatchFactory;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.Destination.DestinationBean;
 import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
@@ -64,7 +63,7 @@
     protected MockBroker rcvBroker;
     protected MockClient client;
 
-    protected AdvancedDispatchSPI dispatcher;
+    protected Dispatch dispatcher;
 
     static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>() {
         public Long map(Message element) {
@@ -95,8 +94,8 @@
         }
     }
 
-    protected AdvancedDispatchSPI createDispatcher(String name) {
-        return new AdvancedDispatchSPI(threadsPerDispatcher, Message.MAX_PRIORITY);
+    protected Dispatch createDispatcher(String name) {
+        return DispatchFactory.create("test", threadsPerDispatcher);
     }
 
     public void test_1_1_0() throws Exception {
@@ -285,7 +284,7 @@
             }
         }
 
-        AdvancedDispatchSPI clientDispatcher = null;
+        Dispatch clientDispatcher = null;
         if (SEPARATE_CLIENT_DISPATCHER) {
             clientDispatcher = createDispatcher("ClientDispatcher");
             clientDispatcher.start();
@@ -344,9 +343,8 @@
             broker.stopServices();
         }
 
-        client.getDispatcher().shutdown();
         if (dispatcher != null) {
-            dispatcher.shutdown();
+            dispatcher.release();
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java Sun Dec  6 14:32:37 2009
@@ -7,9 +7,8 @@
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+import org.apache.activemq.dispatch.Dispatch;
+import org.apache.activemq.dispatch.DispatchFactory;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.Destination.DestinationBean;
 import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
@@ -44,7 +43,7 @@
     protected ArrayList<MetricCounter> additionalReportMetrics = new ArrayList<MetricCounter>();
     protected boolean includeDetailedRates = false;
 
-    protected AdvancedDispatchSPI dispatcher;
+    protected Dispatch dispatcher;
 
     public RemoteConsumer consumer(int index) {
         return consumers.get(index);
@@ -215,7 +214,7 @@
         return testName;
     }
 
-    public void setDispatcher(AdvancedDispatchSPI dispatcher) {
+    public void setDispatcher(Dispatch dispatcher) {
         this.dispatcher = dispatcher;
     }
 
@@ -275,13 +274,13 @@
         }
     }
 
-    public AdvancedDispatchSPI getDispatcher() {
+    public Dispatch getDispatcher() {
         return dispatcher;
     }
 
-    protected AdvancedDispatchSPI createDispatcher() {
+    protected Dispatch createDispatcher() {
         if (dispatcher == null) {
-            dispatcher = new AdvancedDispatchSPI(threadsPerDispatcher, numPriorities);
+            dispatcher = DispatchFactory.create("client", threadsPerDispatcher);
         }
         return dispatcher;
     }
@@ -311,7 +310,7 @@
         }
         finally
         {
-            test.getDispatcher().shutdown();
+            test.getDispatcher().release();
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteConsumer.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteConsumer.java Sun Dec  6 14:32:37 2009
@@ -1,14 +1,11 @@
 package org.apache.activemq.queue.perf;
 
-import java.net.URI;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
-import org.apache.activemq.transport.DispatchableTransport;
-import org.apache.activemq.transport.TransportFactory;
 
 public class RemoteConsumer extends ClientConnection {
 
@@ -32,7 +29,7 @@
     protected void messageReceived(final ISourceController<Message> controller, final Message elem) {
         if (schedualWait) {
             if (thinkTime > 0) {
-                getDispatcher().schedule(new Runnable() {
+                getDispatcher().getGlobalQueue().dispatchAfter(new Runnable() {
 
                     public void run() {
                         consumerRate.increment();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java Sun Dec  6 14:32:37 2009
@@ -48,7 +48,7 @@
         super.start();
         outboundController = outputQueue.getFlowController(outboundFlow);
         
-        dispatchQueue = getDispatcher().createQueue(name + "-client");
+        dispatchQueue = getDispatcher().createSerialQueue(name + "-client");
         dispatchTask = new Runnable(){
             public void run() {
                 dispatch();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java Sun Dec  6 14:32:37 2009
@@ -9,6 +9,7 @@
 import org.apache.activemq.apollo.stomp.StompFrame;
 import org.apache.activemq.apollo.stomp.StompMessageDelivery;
 import org.apache.activemq.broker.RemoteConsumer;
+import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowResource;
@@ -64,7 +65,7 @@
                 return null;
             }
         }, flow, limiter, inboundMutex);
-        inboundController.setExecutor(getDispatcher().createPriorityExecutor(getDispatcher().getDispatchPriorities() - 1));
+        inboundController.setExecutor(getDispatcher().getGlobalQueue(DispatchPriority.HIGH));
     }
     
     public void onCommand(Object command) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java Sun Dec  6 14:32:37 2009
@@ -16,11 +16,12 @@
  */
 package org.apache.activemq.transport;
 
-import org.apache.activemq.dispatch.DispatcherAware;
+import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.DispatchAware;
 
-public interface DispatchableTransport extends Transport, DispatcherAware {
+public interface DispatchableTransport extends Transport, DispatchAware {
 
-    public void setDispatchPriority(int priority);
+    public void setDispatchPriority(DispatchPriority priority);
 
     public void setName(String name);
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887686&r1=887685&r2=887686&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Sun Dec  6 14:32:37 2009
@@ -13,9 +13,11 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatch;
 import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
@@ -69,7 +71,7 @@
         	pipe.write(EOF_TOKEN);
             if (dispatchQueue != null) {
                 RunnableCountDownLatch done = new RunnableCountDownLatch(1);
-                dispatchQueue.setFinalizer(done);
+                dispatchQueue.setShutdownHandler(done);
                 dispatchQueue.release();
                 done.await();
             } else {
@@ -80,8 +82,8 @@
             }
         }
 
-        public void setDispatcher(AdvancedDispatchSPI dispatcher) {
-            dispatchQueue = dispatcher.createQueue(name);
+        public void setDispatcher(Dispatch dispatcher) {
+            dispatchQueue = dispatcher.createSerialQueue(name);
             dispatchTask = new Runnable(){
                 public void run() {
                     dispatch();
@@ -227,7 +229,7 @@
             this.wireFormat = wireFormat;
         }
 
-        public void setDispatchPriority(int priority) {
+        public void setDispatchPriority(DispatchPriority priority) {
 //            TODO:
 //            readContext.updatePriority(priority);
         }



Mime
View raw message