activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r887365 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/...
Date Fri, 04 Dec 2009 21:35:59 GMT
Author: chirino
Date: Fri Dec  4 21:35:23 2009
New Revision: 887365

URL: http://svn.apache.org/viewvc?rev=887365&view=rev
Log:
ported to the DispachSPI api

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchObserver.java
      - copied, changed from r887337, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java
      - copied, changed from r887337, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/LoadBalancer.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/LoadBalancer.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java Fri Dec  4 21:35:23 2009
@@ -60,9 +60,9 @@
         }
     }
 
-    public void shutdown(boolean sync) throws Exception {
+    public void shutdown(Runnable onShutdown) throws Exception {
         if (queue != null) {
-            queue.shutdown(sync);
+            queue.shutdown(onShutdown);
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java Fri Dec  4 21:35:23 2009
@@ -499,7 +499,7 @@
 
             //If we've reached the end of the op queue
             if (opQueue.getEnqueuedCount() == 0) {
-                opQueue.shutdown(false);
+                opQueue.shutdown(null);
             }
         }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Fri Dec  4 21:35:23 2009
@@ -27,6 +27,7 @@
 import org.apache.activemq.apollo.broker.path.PathFilter;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
+import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.buffer.AsciiBuffer;
@@ -161,14 +162,20 @@
         if (!started) {
             return;
         }
-        for (Queue queue : queues.values()) {
-            queue.shutdown(true);
-        }
-
-        for (IQueue<Long, MessageDelivery> queue : queueStore.getDurableQueues()) {
-            queue.shutdown(true);
+        ArrayList<Queue> q = new ArrayList<Queue>(queues.values());
+        RunnableCountDownLatch done = new RunnableCountDownLatch(q.size());
+        for (Queue queue : q) {
+            queue.shutdown(done);
+        }
+        done.await();
+
+        ArrayList<IQueue<Long, MessageDelivery>> durableQueues = new ArrayList<IQueue<Long,MessageDelivery>>(queueStore.getDurableQueues());
+        done = new RunnableCountDownLatch(durableQueues.size());
+        for (IQueue<Long, MessageDelivery> queue : durableQueues) {
+            queue.shutdown(done);
         }
-
+        done.await();
+        
         database.stop();
         started = false;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java Fri Dec  4 21:35:23 2009
@@ -6,7 +6,7 @@
 import org.apache.activemq.apollo.Connection;
 import org.apache.activemq.apollo.broker.Destination;
 import org.apache.activemq.apollo.broker.MessageDelivery;
-import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISinkController;
@@ -29,13 +29,15 @@
     protected String property;
     protected MetricAggregator totalProducerRate;
     protected MessageDelivery next;
-    protected DispatchContext dispatchContext;
+    protected DispatchQueue dispatchQueue;
+    protected Runnable dispatchTask;
     protected String filler;
     protected int payloadSize = 20;
     protected URI uri;
 
     protected IFlowController<MessageDelivery> outboundController;
     protected IFlowSink<MessageDelivery> outboundQueue;
+
     
     public void start() throws Exception {
         
@@ -57,12 +59,13 @@
         
         setupProducer();
         
-        dispatchContext = getDispatcher().register(new Runnable(){
+        dispatchQueue = getDispatcher().createQueue(name + "-client");
+        dispatchTask = new Runnable(){
             public void run() {
                 dispatch();
             }
-        }, name + "-client");
-        dispatchContext.requestDispatch();
+        };
+        dispatchQueue.dispatchAsync(dispatchTask);
 
     }
     
@@ -96,12 +99,12 @@
 
     public void stop() throws Exception
     {
-    	dispatchContext.close(false);
+    	dispatchQueue.release();
     	super.stop();
     }
     
 	public void onFlowUnblocked(ISinkController<MessageDelivery> controller) {
-		dispatchContext.requestDispatch();
+        dispatchQueue.dispatchAsync(dispatchTask);
 	}
 
     protected String createPayload() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java Fri Dec  4 21:35:23 2009
@@ -22,8 +22,12 @@
  */
 public interface DispatchObject {
     
+    public void retain();
+    public void release();
+
     public <Context> Context getContext();
     public <Context> void setContext(Context context);
+
     public void suspend();
     public void resume();
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Fri Dec  4 21:35:23 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.dispatch.internal;
 
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.activemq.dispatch.DispatchObject;
 import org.apache.activemq.dispatch.DispatchQueue;
 
@@ -25,9 +27,9 @@
  */
 abstract public class AbstractDispatchObject implements DispatchObject {
 
-    protected Object context;
-    protected Runnable finalizer;
-    protected DispatchQueue targetQueue;
+    protected volatile Object context;
+    protected volatile DispatchQueue targetQueue;
+    protected AtomicReference<Runnable> finalizer = new AtomicReference<Runnable>();
 
     @SuppressWarnings("unchecked")
     public <Context> Context getContext() {
@@ -39,7 +41,7 @@
     }
 
     public void setFinalizer(Runnable finalizer) {
-        this.finalizer = finalizer;
+        this.finalizer.set(finalizer);
     }
 
     public void setTargetQueue(DispatchQueue targetQueue) {

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java?rev=887365&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java Fri Dec  4 21:35:23 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal;
+
+import java.util.concurrent.CountDownLatch;
+
+public class RunnableCountDownLatch extends CountDownLatch implements Runnable {
+    public RunnableCountDownLatch(int count) {
+        super(count);
+    }
+    public void run() {
+        countDown();
+    }
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java Fri Dec  4 21:35:23 2009
@@ -31,6 +31,7 @@
 
     private final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
     final private String label;
+    final private AtomicInteger reatinCounter = new AtomicInteger(1);
     final private AtomicInteger suspendCounter = new AtomicInteger();
     final private AtomicLong size = new AtomicLong();
     
@@ -60,9 +61,12 @@
         if( runnable == null ) {
             throw new IllegalArgumentException();
         }
-        long lastSize = size.incrementAndGet();
+        long lastSize = size.getAndIncrement();
+        if( lastSize==0 ) {
+            retain();
+        }
         runnables.add(runnable);
-        if( targetQueue!=null && lastSize == 1 && suspendCounter.get()<=0 ) {
+        if( targetQueue!=null && lastSize == 0 && suspendCounter.get()<=0 ) {
             targetQueue.dispatchAsync(this);
         }
     }
@@ -79,6 +83,9 @@
                     if( runnable!=null ) {
                         runnable.run();
                         lsize = size.decrementAndGet();
+                        if( lsize==0 ) {
+                            release();
+                        }
                     }
                 } catch (Throwable e) {
                     e.printStackTrace();
@@ -96,4 +103,19 @@
     public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException {
         QueueSupport.dispatchApply(this, iterations, runnable);
     }
+
+    public void retain() {
+        int prev = reatinCounter.getAndIncrement();
+        assert prev!=0;
+    }
+
+    public void release() {
+        if( reatinCounter.decrementAndGet()==0 ) {
+            Runnable value = finalizer.getAndSet(null);
+            if( value!=null ) {
+                value.run();
+            }
+        }
+    }
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java Fri Dec  4 21:35:23 2009
@@ -27,7 +27,6 @@
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSPI;
 import org.apache.activemq.dispatch.DispatchSource;
-import org.apache.activemq.dispatch.LoadBalancer;
 import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
 import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
 
@@ -177,9 +176,9 @@
         }
     }
 
-    public DispatchContext register(Runnable runnable, String name) {
-        return chooseDispatcher().register(runnable, name);
-    }
+//    public DispatchContext register(Runnable runnable, String name) {
+//        return chooseDispatcher().register(runnable, name);
+//    }
 
 	public int getSize() {
 		return size;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java Fri Dec  4 21:35:23 2009
@@ -1,68 +1,260 @@
 package org.apache.activemq.dispatch.internal.advanced;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 
-import org.apache.activemq.dispatch.DispatchObserver;
-
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread.UpdateEvent;
+import org.apache.activemq.util.list.LinkedNode;
 
 /**
- * Returned to callers registered with this dispathcer. Used by the caller
- * to inform the dispatcher that it is ready for dispatch.
  * 
- * Note that DispatchContext is not safe for concurrent access by multiple
- * threads.
  */
-public interface DispatchContext {
-    
-    /**
-     * Once registered with a dispatcher, this can be called to request
-     * dispatch. The {@link Dispatchable} will remain in the dispatch queue
-     * until a subsequent call to {@link Dispatchable#dispatch()} returns
-     * false;
-     * 
-     * @throws RejectedExecutionException If the dispatcher has been shutdown.
-     */
-    public void requestDispatch() throws RejectedExecutionException;
+class DispatchContext extends LinkedNode<DispatchContext> {
+
+    private final DispatcherThread dispacher;
+    // The target:
+    private final Runnable runnable;
+    // The name of this context:
+    final String name;
+    // list prio can only be updated in the thread of of the owning
+    // dispatcher
+    protected int listPrio;
+
+    // The update events are used to update fields in the dispatch context
+    // from foreign threads:
+    final UpdateEvent updateEvent[];
+
+    final DispatchObserver tracker;
+    protected DispatcherThread currentOwner;
+    private DispatcherThread updateDispatcher = null;
+
+    int priority;
+    private boolean dispatchRequested = false;
+    private boolean closed = false;
+    final CountDownLatch closeLatch = new CountDownLatch(1);
+
+    protected DispatchContext(DispatcherThread dispatcherThread, Runnable runnable, boolean persistent, String name) {
+        dispacher = dispatcherThread;
+        this.runnable = runnable;
+        this.name = name;
+        this.currentOwner = (DispatcherThread) dispacher;
+        if (persistent && dispacher.spi != null) {
+            this.tracker = dispacher.spi.getLoadBalancer().createExecutionTracker((DispatchContext) this);
+        } else {
+            this.tracker = null;
+        }
+        updateEvent = createUpdateEvent();
+        updateEvent[0] = dispacher.new UpdateEvent(this);
+        updateEvent[1] = dispacher.new UpdateEvent(this);
+        if (persistent) {
+            currentOwner.takeOwnership(this);
+        }
+    }
+
+    private final DispatcherThread.UpdateEvent[] createUpdateEvent() {
+        return new DispatcherThread.UpdateEvent[2];
+    }
 
     /**
-     * This can be called to update the dispatch priority.
+     * Gets the execution tracker for the context.
      * 
-     * @param priority
+     * @return the execution tracker for the context:
      */
-    public void updatePriority(int priority);
+    public DispatchObserver getExecutionTracker() {
+        return tracker;
+    }
 
     /**
-     * Gets the name of the dispatch context
+     * This can only be called by the owning dispatch thread:
      * 
-     * @return The dispatchable
+     * @return False if the dispatchable has more work to do.
      */
-    public String getName();
+    public final void run() {
+        runnable.run();
+    }
 
-    /**
-     * This must be called to release any resource the dispatcher is holding
-     * on behalf of this context. Once called this {@link DispatchContext} should
-     * no longer be used. 
-     */
-    public void close(boolean sync);
-    
-    /**
-     * Called to transfer a {@link PooledDispatchContext} to a new
-     * Dispatcher.
-     */
-    public void setTargetQueue(DispatcherThread newDispatcher);
+    public final void setTargetQueue(DispatcherThread newDispatcher) {
+        synchronized (this) {
 
-    /**
-     * Gets the dispatcher to which this PooledDispatchContext currently
-     * belongs
-     * 
-     * @return
-     */
-    public DispatcherThread getTargetQueue();
+            // If we're already set to this dispatcher
+            if (newDispatcher == currentOwner) {
+                if (updateDispatcher == null || updateDispatcher == newDispatcher) {
+                    return;
+                }
+            }
+
+            updateDispatcher = (DispatcherThread) newDispatcher;
+            if (DispatcherThread.DEBUG)
+                System.out.println(getName() + " updating to " + updateDispatcher);
+
+            currentOwner.onForeignUpdate(this);
+        }
+
+    }
+
+    public void requestDispatch() {
+
+        DispatcherThread callingDispatcher = dispacher.getCurrentDispatcher();
+        if (tracker != null)
+            tracker.onDispatch(callingDispatcher, dispacher.getCurrentDispatchContext());
+
+        // Otherwise this is coming off another thread, so we need to
+        // synchronize
+        // to protect against ownership changes:
+        synchronized (this) {
+            // If the owner of this context is the calling thread, then
+            // delegate to the dispatcher.
+            if (currentOwner == callingDispatcher) {
+
+                if (!currentOwner.running) {
+                    // TODO In the event that the current dispatcher
+                    // failed due to a runtime exception, we could
+                    // try to switch to a new dispatcher.
+                    throw new RejectedExecutionException();
+                }
+                if (!isLinked()) {
+                    currentOwner.priorityQueue.add(this, listPrio);
+                }
+                return;
+            }
+
+            dispatchRequested = true;
+            currentOwner.onForeignUpdate(this);
+        }
+    }
+
+    public void updatePriority(int priority) {
+
+        if (closed) {
+            return;
+        }
+
+        priority = Math.min(priority, dispacher.MAX_USER_PRIORITY);
+
+        if (this.priority == priority) {
+            return;
+        }
+        DispatcherThread callingDispatcher = dispacher.getCurrentDispatcher();
+
+        // Otherwise this is coming off another thread, so we need to
+        // synchronize to protect against ownership changes:
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            this.priority = priority;
+
+            // If this is called by the owning dispatcher, then we go ahead
+            // and update:
+            if (currentOwner == callingDispatcher) {
+
+                if (priority != listPrio) {
+
+                    listPrio = priority;
+                    // If there is a priority change relink the context
+                    // at the new priority:
+                    if (isLinked()) {
+                        unlink();
+                        currentOwner.priorityQueue.add(this, listPrio);
+                    }
+                }
+                return;
+            }
+
+            currentOwner.onForeignUpdate(this);
+        }
+
+    }
+
+    public void processForeignUpdates() {
+        synchronized (this) {
+
+            if (closed) {
+                close(false);
+                return;
+            }
+
+            if (updateDispatcher != null && updateDispatcher.takeOwnership(this)) {
+                if (DispatcherThread.DEBUG) {
+                    System.out.println("Assigning " + getName() + " to " + updateDispatcher);
+                }
+
+                if (currentOwner.removeDispatchContext(this)) {
+                    dispatchRequested = true;
+                }
+
+                updateDispatcher.onForeignUpdate(this);
+                switchedDispatcher(currentOwner, updateDispatcher);
+                currentOwner = updateDispatcher;
+                updateDispatcher = null;
+
+            } else {
+                updatePriority(priority);
+
+                if (dispatchRequested) {
+                    dispatchRequested = false;
+                    requestDispatch();
+                }
+            }
+        }
+    }
 
     /**
-     * Gets the execution tracker for the context.
+     * May be overriden by subclass to additional work on dispatcher switch
      * 
-     * @return the execution tracker for the context:
+     * @param oldDispatcher The old dispatcher
+     * @param newDispatcher The new Dispatcher
      */
-    public DispatchObserver getExecutionTracker();    
+    protected void switchedDispatcher(DispatcherThread oldDispatcher, DispatcherThread newDispatcher) {
+
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public void close(boolean sync) {
+        DispatcherThread callingDispatcher = dispacher.getCurrentDispatcher();
+        // System.out.println(this + "Closing");
+        synchronized (this) {
+            closed = true;
+            // If the owner of this context is the calling thread, then
+            // delegate to the dispatcher.
+            if (currentOwner == callingDispatcher) {
+                dispacher.removeDispatchContext(this);
+                closeLatch.countDown();
+                return;
+            }
+        }
+
+        currentOwner.onForeignUpdate(this);
+        if (sync) {
+            boolean interrupted = false;
+            while (true) {
+                try {
+                    closeLatch.await();
+                    break;
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
+            }
+
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    public final String toString() {
+        return getName();
+    }
+
+    public DispatcherThread getTargetQueue() {
+        return currentOwner;
+    }
+
+    public String getName() {
+        return name;
+    }
+
 }
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchObserver.java (from r887337, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchObserver.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchObserver.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java&r1=887337&r2=887365&rev=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchObserver.java Fri Dec  4 21:35:23 2009
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
 
 import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 
 public interface DispatchObserver {
     

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Fri Dec  4 21:35:23 2009
@@ -18,15 +18,12 @@
 
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.dispatch.DispatchObserver;
 import org.apache.activemq.dispatch.DispatchSystem;
 import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
 import org.apache.activemq.util.Mapper;
@@ -39,18 +36,18 @@
 
     private final ThreadDispatchQueue dispatchQueues[];
     
-    private static final boolean DEBUG = false;
+    static final boolean DEBUG = false;
     private Thread thread;
     protected boolean running = false;
     private boolean threaded = false;
     protected final int MAX_USER_PRIORITY;
-    protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
+    protected final HashSet<DispatchContext> contexts = new HashSet<DispatchContext>();
 
     // Set if this dispatcher is part of a dispatch pool:
     protected final AdvancedDispatchSPI spi;
 
     // The local dispatch queue:
-    protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+    protected final PriorityLinkedList<DispatchContext> priorityQueue;
 
     // Dispatch queue for requests from other threads:
     private final LinkedNodeList<ForeignEvent>[] foreignQueue;
@@ -69,8 +66,8 @@
     private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
     private final Semaphore foreignPermits = new Semaphore(0);
 
-    private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
-        public Integer map(PriorityDispatchContext element) {
+    private final Mapper<Integer, DispatchContext> PRIORITY_MAPPER = new Mapper<Integer, DispatchContext>() {
+        public Integer map(DispatchContext element) {
             return element.listPrio;
         }
     };
@@ -84,7 +81,7 @@
         }
 
         MAX_USER_PRIORITY = priorities - 1;
-        priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
+        priorityQueue = new PriorityLinkedList<DispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
         foreignQueue = createForeignEventQueue();
         for (int i = 0; i < 2; i++) {
             foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
@@ -124,10 +121,10 @@
         return MAX_USER_PRIORITY;
     }
 
-    private class UpdateEvent extends ForeignEvent {
-        private final PriorityDispatchContext pdc;
+    class UpdateEvent extends ForeignEvent {
+        private final DispatchContext pdc;
 
-        UpdateEvent(PriorityDispatchContext pdc) {
+        UpdateEvent(DispatchContext pdc) {
             this.pdc = pdc;
         }
 
@@ -138,7 +135,7 @@
     }
 
     public DispatchContext register(Runnable runnable, String name) {
-        return new PriorityDispatchContext(runnable, true, name);
+        return new DispatchContext(this, runnable, true, name);
     }
 
     /*
@@ -191,14 +188,14 @@
     }
 
     protected void cleanup() {
-        ArrayList<PriorityDispatchContext> toClose = null;
+        ArrayList<DispatchContext> toClose = null;
         synchronized (this) {
             running = false;
-            toClose = new ArrayList<PriorityDispatchContext>(contexts.size());
+            toClose = new ArrayList<DispatchContext>(contexts.size());
             toClose.addAll(contexts);
         }
 
-        for (PriorityDispatchContext context : toClose) {
+        for (DispatchContext context : toClose) {
             context.close(false);
         }
     }
@@ -210,7 +207,7 @@
             spi.onDispatcherStarted((DispatcherThread) this);
         }
 
-        PriorityDispatchContext pdc;
+        DispatchContext pdc;
         try {
             while (running) {
                 int counter = 0;
@@ -303,7 +300,7 @@
         foreignPermits.release();
     }
 
-    protected final void onForeignUpdate(PriorityDispatchContext context) {
+    protected final void onForeignUpdate(DispatchContext context) {
         synchronized (foreignQueue) {
 
             ForeignEvent fe = context.updateEvent[foreignToggle];
@@ -316,7 +313,7 @@
         }
     }
 
-    protected final boolean removeDispatchContext(PriorityDispatchContext context) {
+    protected final boolean removeDispatchContext(DispatchContext context) {
         synchronized (foreignQueue) {
 
             if (context.updateEvent[0].isLinked()) {
@@ -339,7 +336,7 @@
         return false;
     }
 
-    protected final boolean takeOwnership(PriorityDispatchContext context) {
+    protected final boolean takeOwnership(DispatchContext context) {
         synchronized (this) {
             if (running) {
                 contexts.add(context);
@@ -352,7 +349,7 @@
 
     //Special dispatch method that allow high priority dispatch:
     private final void dispatchInternal(Runnable runnable, int priority) {
-        PriorityDispatchContext context = new PriorityDispatchContext(runnable, false, name);
+        DispatchContext context = new DispatchContext(this, runnable, false, name);
         context.priority = priority;
         context.requestDispatch();
     }
@@ -365,7 +362,7 @@
      * .dispatch.Dispatcher.Dispatchable)
      */
     public final void dispatch(Runnable runnable, int priority) {
-        PriorityDispatchContext context = new PriorityDispatchContext(runnable, false, name);
+        DispatchContext context = new DispatchContext(this, runnable, false, name);
         context.updatePriority(priority);
         context.requestDispatch();
     }
@@ -425,7 +422,7 @@
         return name;
     }
 
-    private final DispatcherThread getCurrentDispatcher() {
+    final DispatcherThread getCurrentDispatcher() {
         if (spi != null) {
             return (DispatcherThread) spi.getCurrentDispatcher();
         } else if (Thread.currentThread() == thread) {
@@ -436,260 +433,10 @@
 
     }
 
-    private final DispatchContext getCurrentDispatchContext() {
+    final DispatchContext getCurrentDispatchContext() {
         return spi.getCurrentDispatchContext();
     }
 
-    /**
-     * 
-     */
-    protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements DispatchContext {
-        // The target:
-        private final Runnable runnable;
-        // The name of this context:
-        final String name;
-        // list prio can only be updated in the thread of of the owning
-        // dispatcher
-        protected int listPrio;
-
-        // The update events are used to update fields in the dispatch context
-        // from foreign threads:
-        final UpdateEvent updateEvent[];
-
-        private final DispatchObserver tracker;
-        protected DispatcherThread currentOwner;
-        private DispatcherThread updateDispatcher = null;
-
-        private int priority;
-        private boolean dispatchRequested = false;
-        private boolean closed = false;
-        final CountDownLatch closeLatch = new CountDownLatch(1);
-
-        protected PriorityDispatchContext(Runnable runnable, boolean persistent, String name) {
-            this.runnable = runnable;
-            this.name = name;
-            this.currentOwner = (DispatcherThread) DispatcherThread.this;
-            if (persistent && spi != null) {
-                this.tracker = spi.getLoadBalancer().createExecutionTracker((DispatchContext) this);
-            } else {
-                this.tracker = null;
-            }
-            updateEvent = createUpdateEvent();
-            updateEvent[0] = new UpdateEvent(this);
-            updateEvent[1] = new UpdateEvent(this);
-            if (persistent) {
-                currentOwner.takeOwnership(this);
-            }
-        }
-
-        private final DispatcherThread.UpdateEvent[] createUpdateEvent() {
-            return new DispatcherThread.UpdateEvent[2];
-        }
-
-        /**
-         * Gets the execution tracker for the context.
-         * 
-         * @return the execution tracker for the context:
-         */
-        public DispatchObserver getExecutionTracker() {
-            return tracker;
-        }
-
-        /**
-         * This can only be called by the owning dispatch thread:
-         * 
-         * @return False if the dispatchable has more work to do.
-         */
-        public final void run() {
-            runnable.run();
-        }
-
-        public final void setTargetQueue(DispatcherThread newDispatcher) {
-            synchronized (this) {
-
-                // If we're already set to this dispatcher
-                if (newDispatcher == currentOwner) {
-                    if (updateDispatcher == null || updateDispatcher == newDispatcher) {
-                        return;
-                    }
-                }
-
-                updateDispatcher = (DispatcherThread) newDispatcher;
-                if (DEBUG)
-                    System.out.println(getName() + " updating to " + updateDispatcher);
-
-                currentOwner.onForeignUpdate(this);
-            }
-
-        }
-
-        public void requestDispatch() {
-
-            DispatcherThread callingDispatcher = getCurrentDispatcher();
-            if (tracker != null)
-                tracker.onDispatch(callingDispatcher, getCurrentDispatchContext());
-
-            // Otherwise this is coming off another thread, so we need to
-            // synchronize
-            // to protect against ownership changes:
-            synchronized (this) {
-                // If the owner of this context is the calling thread, then
-                // delegate to the dispatcher.
-                if (currentOwner == callingDispatcher) {
-
-                    if (!currentOwner.running) {
-                        // TODO In the event that the current dispatcher
-                        // failed due to a runtime exception, we could
-                        // try to switch to a new dispatcher.
-                        throw new RejectedExecutionException();
-                    }
-                    if (!isLinked()) {
-                        currentOwner.priorityQueue.add(this, listPrio);
-                    }
-                    return;
-                }
-
-                dispatchRequested = true;
-                currentOwner.onForeignUpdate(this);
-            }
-        }
-
-        public void updatePriority(int priority) {
-
-            if (closed) {
-                return;
-            }
-
-            priority = Math.min(priority, MAX_USER_PRIORITY);
-
-            if (this.priority == priority) {
-                return;
-            }
-            DispatcherThread callingDispatcher = getCurrentDispatcher();
-
-            // Otherwise this is coming off another thread, so we need to
-            // synchronize to protect against ownership changes:
-            synchronized (this) {
-                if (closed) {
-                    return;
-                }
-                this.priority = priority;
-
-                // If this is called by the owning dispatcher, then we go ahead
-                // and update:
-                if (currentOwner == callingDispatcher) {
-
-                    if (priority != listPrio) {
-
-                        listPrio = priority;
-                        // If there is a priority change relink the context
-                        // at the new priority:
-                        if (isLinked()) {
-                            unlink();
-                            currentOwner.priorityQueue.add(this, listPrio);
-                        }
-                    }
-                    return;
-                }
-
-                currentOwner.onForeignUpdate(this);
-            }
-
-        }
-
-        public void processForeignUpdates() {
-            synchronized (this) {
-
-                if (closed) {
-                    close(false);
-                    return;
-                }
-
-                if (updateDispatcher != null && updateDispatcher.takeOwnership(this)) {
-                    if (DEBUG) {
-                        System.out.println("Assigning " + getName() + " to " + updateDispatcher);
-                    }
-
-                    if (currentOwner.removeDispatchContext(this)) {
-                        dispatchRequested = true;
-                    }
-
-                    updateDispatcher.onForeignUpdate(this);
-                    switchedDispatcher(currentOwner, updateDispatcher);
-                    currentOwner = updateDispatcher;
-                    updateDispatcher = null;
-
-                } else {
-                    updatePriority(priority);
-
-                    if (dispatchRequested) {
-                        dispatchRequested = false;
-                        requestDispatch();
-                    }
-                }
-            }
-        }
-
-        /**
-         * May be overriden by subclass to additional work on dispatcher switch
-         * 
-         * @param oldDispatcher The old dispatcher
-         * @param newDispatcher The new Dispatcher
-         */
-        protected void switchedDispatcher(DispatcherThread oldDispatcher, DispatcherThread newDispatcher) {
-
-        }
-
-        public boolean isClosed() {
-            return closed;
-        }
-
-        public void close(boolean sync) {
-            DispatcherThread callingDispatcher = getCurrentDispatcher();
-            // System.out.println(this + "Closing");
-            synchronized (this) {
-                closed = true;
-                // If the owner of this context is the calling thread, then
-                // delegate to the dispatcher.
-                if (currentOwner == callingDispatcher) {
-                    removeDispatchContext(this);
-                    closeLatch.countDown();
-                    return;
-                }
-            }
-
-            currentOwner.onForeignUpdate(this);
-            if (sync) {
-                boolean interrupted = false;
-                while (true) {
-                    try {
-                        closeLatch.await();
-                        break;
-                    } catch (InterruptedException e) {
-                        interrupted = true;
-                    }
-                }
-
-                if (interrupted) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-
-        public final String toString() {
-            return getName();
-        }
-
-        public DispatcherThread getTargetQueue() {
-            return currentOwner;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-    }
-
     public String getName() {
         return name;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Fri Dec  4 21:35:23 2009
@@ -85,4 +85,10 @@
     public DispatchQueue getTargetQueue() {
         throw new UnsupportedOperationException();
     }
+
+    public void release() {
+    }
+
+    public void retain() {
+    }
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java (from r887337, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/LoadBalancer.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/LoadBalancer.java&r1=887337&r2=887365&rev=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/LoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java Fri Dec  4 21:35:23 2009
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
 
 import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 
 
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java Fri Dec  4 21:35:23 2009
@@ -22,8 +22,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.dispatch.DispatchObserver;
-import org.apache.activemq.dispatch.LoadBalancer;
 
 
 public class SimpleLoadBalancer implements LoadBalancer {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java Fri Dec  4 21:35:23 2009
@@ -86,4 +86,10 @@
         throw new UnsupportedOperationException();
     }
 
+    public void release() {
+    }
+
+    public void retain() {
+    }
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Fri Dec  4 21:35:23 2009
@@ -103,4 +103,10 @@
     public DispatchQueuePriority getPriority() {
         return priority;
     }
+
+    public void release() {
+    }
+
+    public void retain() {
+    }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java Fri Dec  4 21:35:23 2009
@@ -123,4 +123,10 @@
         return priority;
     }
 
+    public void release() {
+    }
+
+    public void retain() {
+    }
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Fri Dec  4 21:35:23 2009
@@ -18,6 +18,7 @@
 
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
 import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSPI;
 
@@ -30,15 +31,6 @@
  */
 public class DispatchSystemTest {
 
-    public static class RunnableCountDownLatch extends CountDownLatch implements Runnable {
-        public RunnableCountDownLatch(int count) {
-            super(count);
-        }
-        public void run() {
-            countDown();
-        }
-    }
-    
     public static void main(String[] args) throws Exception {
         DispatchSPI advancedSystem = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), 3);
         advancedSystem.start();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Fri Dec  4 21:35:23 2009
@@ -18,6 +18,8 @@
 
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.activemq.dispatch.DispatchQueue;
+
 
 import static java.lang.String.*;
 
@@ -50,24 +52,24 @@
         final CountDownLatch counter = new CountDownLatch(iterations);
         for (int i = 0; i < 1000; i++) {
             Work dispatchable = new Work(counter, pooledDispatcher);
-            dispatchable.context.requestDispatch();
+            dispatchable.dispatchQueue.dispatchAsync(dispatchable);
         }
         counter.await();
     }
     
     private static final class Work implements Runnable {
         private final CountDownLatch counter;
-        private final DispatchContext context;
+        private final DispatchQueue dispatchQueue;
 
         private Work(CountDownLatch counter, AdvancedDispatchSPI spi) {
             this.counter = counter;
-            this.context = spi.register(this , "test");
+            dispatchQueue = spi.createQueue("test");
         }
 
         public void run() {
             counter.countDown();
             if( counter.getCount()>0 ) {
-                context.requestDispatch();
+                dispatchQueue.dispatchAsync(this);
             }
         }
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Fri Dec  4 21:35:23 2009
@@ -39,8 +39,8 @@
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
@@ -288,7 +288,8 @@
         private String name;
         protected final MetricCounter sendRate = new MetricCounter();
 		AtomicBoolean waitingForAck = new AtomicBoolean();
-        private final DispatchContext dispatchContext;
+        private final DispatchQueue dispatchQueue;
+        private final Runnable dispatchTask;
 
         protected IFlowController<OpenWireMessageDelivery> outboundController;
         protected final AbstractFlowRelay<OpenWireMessageDelivery> outboundQueue;
@@ -306,11 +307,14 @@
             this.name = name;
             sendRate.name("Producer " + name + " Rate");
             totalProducerRate.add(sendRate);
-            dispatchContext = dispatcher.register(new Runnable(){
+            
+            dispatchQueue = dispatcher.createQueue(name);
+            dispatchTask = new Runnable(){
                 public void run() {
                     dispatch();
                 }
-            }, name);
+            };
+ 
             // create a 1024 byte payload (2 bytes per char):
             payload = new String(new byte[512]);
             producerId = new ProducerId(name);
@@ -353,7 +357,7 @@
         }
 
         public void start() {
-            dispatchContext.requestDispatch();
+            dispatchQueue.dispatchAsync(dispatchTask);
         }
 
         public void stop() {
@@ -383,7 +387,7 @@
 						next.setPersistListener(new PersistListener() {
 							public void onMessagePersisted(OpenWireMessageDelivery delivery) {
 								waitingForAck.set(false);
-								dispatchContext.requestDispatch();
+					            dispatchQueue.dispatchAsync(dispatchTask);
 							}
 						});
 					}
@@ -398,7 +402,7 @@
             outboundQueue.add(next, null);
             next = null;
             if ( !stopped.get() ) {
-                dispatchContext.requestDispatch();
+                dispatchQueue.dispatchAsync(dispatchTask);
             }
         }
 
@@ -416,7 +420,7 @@
         }
 
         public void onFlowUnblocked(ISinkController<OpenWireMessageDelivery> controller) {
-            dispatchContext.requestDispatch();
+            dispatchQueue.dispatchAsync(dispatchTask);
         }
 
         public String toString() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Fri Dec  4 21:35:23 2009
@@ -19,7 +19,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 
-import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 
@@ -32,7 +32,6 @@
 public abstract class AbstractFlowQueue<E> extends AbstractFlowRelay<E> implements FlowControllable<E>, IFlowQueue<E> {
 
     protected AdvancedDispatchSPI dispatcher;
-    protected DispatchContext dispatchContext;
     protected Collection<IPollableFlowSource.FlowReadyListener<E>> readyListeners;
     private boolean notifyReady = false;
     protected int dispatchPriority = 0;
@@ -44,6 +43,8 @@
     };
     protected boolean started;
     protected Subscription<E> sub;
+    protected DispatchQueue dispatchQueue;
+    protected Runnable dispatchTask;
 
     AbstractFlowQueue() {
         super();
@@ -78,19 +79,17 @@
      * Calls stop and cleans up resources associated with the queue.
      * 
      * @param sync
+     * @throws InterruptedException 
      */
-    public void shutdown(boolean sync) {
-        stop();
-        DispatchContext dc = null;
-        synchronized (this) {
-            dc = dispatchContext;
-            dispatchContext = null;
-
-        }
-
-        if (dc != null) {
-            dc.close(sync);
+    public void shutdown(Runnable onShutdown)  {
+        if( dispatchQueue == null ) {
+            throw new IllegalStateException();
         }
+        
+        stop();
+        dispatchQueue.setFinalizer(onShutdown);
+        dispatchQueue.release();
+        dispatchQueue = null;
     }
 
     /**
@@ -134,21 +133,28 @@
      */
     public synchronized void setDispatcher(AdvancedDispatchSPI dispatcher) {
         this.dispatcher = dispatcher;
-        dispatchContext = dispatcher.register(new Runnable(){
+        
+        dispatchQueue = dispatcher.createQueue(getResourceName());
+        dispatchTask = new Runnable(){
             public void run() {
                 if( pollingDispatch() ) {
-                    dispatchContext.requestDispatch();
+                    dispatchQueue.dispatchAsync(dispatchTask);
                 }
-            }}, getResourceName());
-        dispatchContext.updatePriority(dispatchPriority);
+            }
+        };
+        
+//        TODO:
+//        dispatchContext.updatePriority(dispatchPriority);
+        
         super.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
     }
 
     public synchronized void setDispatchPriority(int priority) {
         dispatchPriority = priority;
-        if (dispatchContext != null) {
-            dispatchContext.updatePriority(priority);
-        }
+//        TODO:
+//        if (dispatchContext != null) {
+//            dispatchContext.updatePriority(priority);
+//        }
     }
 
     public synchronized void addFlowReadyListener(IPollableFlowSource.FlowReadyListener<E> watcher) {
@@ -177,8 +183,8 @@
      * Indicates that there are elements ready for dispatch.
      */
     protected void notifyReady() {
-        if (dispatchContext != null) {
-            dispatchContext.requestDispatch();
+        if (dispatchQueue != null) {
+            dispatchQueue.dispatchAsync(dispatchTask);
             return;
         }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java Fri Dec  4 21:35:23 2009
@@ -1651,9 +1651,8 @@
     }
 
     /**
-     * @param sync
      */
-    public void shutdown(boolean sync) {
+    public void shutdown() {
         stop();
         if (!openCursors.isEmpty()) {
             ArrayList<Cursor<V>> cursors = new ArrayList<Cursor<V>>(openCursors.size());

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Fri Dec  4 21:35:23 2009
@@ -259,11 +259,17 @@
         }
     }
 
-    public void shutdown(boolean sync) {
-        super.shutdown(sync);
-        synchronized (this) {
-            queue.shutdown(sync);
-        }
+    public void shutdown(final Runnable onShutdown) {
+        super.shutdown(new Runnable() {
+            public void run() {
+                synchronized (ExclusivePersistentQueue.this) {
+                    queue.shutdown();
+                }
+                if( onShutdown!=null ) {
+                    onShutdown.run();
+                }
+            }
+        });
     }
 
     public FlowController<E> getFlowController(Flow flow) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java Fri Dec  4 21:35:23 2009
@@ -87,12 +87,8 @@
      * the queue is shutdown will thrown an {@link IllegalStateException} unless
      * otherwise documented.
      * 
-     * @param sync
-     *            If true will cause the calling thread to block until all
-     *            resources held by the queue are cleaned up. Otherwise, the
-     *            queue shutdown will proceed asynchronously.
      */
-    public void shutdown(boolean sync);
+    public void shutdown(Runnable onShutdown);
     
     /**
      * Removes the element with the given sequence from this queue

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Fri Dec  4 21:35:23 2009
@@ -19,6 +19,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.flow.ISourceController;
@@ -169,7 +170,7 @@
         }
     }
 
-    public void shutdown(boolean sync) {
+    public void shutdown(final Runnable onShutdown) {
         Collection<IQueue<K, V>> partitions = null;
         synchronized (this) {
             if (!shutdown) {
@@ -181,9 +182,22 @@
         }
 
         if (partitions != null) {
+
+            Runnable wrapper=null;
+            if( onShutdown!=null ) {
+                final AtomicInteger  countDown = new AtomicInteger(partitions.size());
+                wrapper = new Runnable() {
+                    public void run() {
+                        if( countDown.decrementAndGet()==0 ) {
+                            onShutdown.run();
+                        }
+                    }
+                };
+            }
+            
             for (IQueue<K, V> partition : partitions) {
                 if (partition != null)
-                    partition.shutdown(sync);
+                    partition.shutdown(wrapper);
             }
         }
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Fri Dec  4 21:35:23 2009
@@ -38,9 +38,9 @@
     }
 
     @Override
-    public void shutdown(boolean sync) {
+    public void shutdown(Runnable onShutdown) {
         try {
-            super.shutdown(sync);
+            super.shutdown(onShutdown);
         } finally {
             partitions.clear();
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java Fri Dec  4 21:35:23 2009
@@ -337,11 +337,17 @@
         }
     }
 
-    public void shutdown(boolean sync) {
-        super.shutdown(sync);
-        synchronized (mutex) {
-            queue.shutdown(sync);
-        }
+    public void shutdown(final Runnable onShutdown) {
+        super.shutdown(new Runnable() {
+            public void run() {
+                synchronized (mutex) {
+                    queue.shutdown();
+                }
+                if( onShutdown!=null ) {
+                    onShutdown.run();
+                }
+            }
+        });
     }
 
     public void add(V elem, ISourceController<?> source) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java Fri Dec  4 21:35:23 2009
@@ -2,7 +2,7 @@
 
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
@@ -24,12 +24,14 @@
     private String property;
     private MetricAggregator totalProducerRate;
     Message next;
-    private DispatchContext dispatchContext;
 
     private String filler;
     private int payloadSize = 0;
     IFlowController<Message> outboundController;
 
+    private DispatchQueue dispatchQueue;
+    private Runnable dispatchTask;
+
     public void start() throws Exception {
 
         if (payloadSize > 0) {
@@ -45,21 +47,24 @@
 
         super.start();
         outboundController = outputQueue.getFlowController(outboundFlow);
-        dispatchContext = getDispatcher().register(new Runnable() {
+        
+        dispatchQueue = getDispatcher().createQueue(name + "-client");
+        dispatchTask = new Runnable(){
             public void run() {
                 dispatch();
             }
-        }, name + "-client");
-        dispatchContext.requestDispatch();
+        };
+        dispatchQueue.dispatchAsync(dispatchTask);
+        
     }
 
     public void stop() throws Exception {
-        dispatchContext.close(false);
+        dispatchQueue.release();
         super.stop();
     }
 
     public void onFlowUnblocked(ISinkController<Message> controller) {
-        dispatchContext.requestDispatch();
+        dispatchQueue.dispatchAsync(dispatchTask);
     }
 
     public void dispatch() {
@@ -87,7 +92,7 @@
             getSink().add(next, null);
             rate.increment();
             next = null;
-            dispatchContext.requestDispatch();
+            dispatchQueue.dispatchAsync(dispatchTask);
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887365&r1=887364&r2=887365&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Fri Dec  4 21:35:23 2009
@@ -13,7 +13,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
 import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.FutureResponse;
@@ -43,20 +44,21 @@
         private String remoteAddress;
         private AtomicBoolean stopping = new AtomicBoolean();
         private Thread thread;
-        private DispatchContext readContext;
         private String name;
         private WireFormat wireFormat;
         private boolean marshal;
         private boolean trace;
+        private DispatchQueue dispatchQueue;
+        private Runnable dispatchTask;
 
         public PipeTransport(Pipe<Object> pipe) {
             this.pipe = pipe;
         }
 
         public void start() throws Exception {
-            if (readContext != null) {
+            if (dispatchQueue != null) {
                 pipe.setMode(Pipe.ASYNC);
-                readContext.requestDispatch();
+                dispatchQueue.dispatchAsync(dispatchTask);
             } else {
                 thread = new Thread(this, getRemoteAddress());
                 thread.start();
@@ -65,8 +67,11 @@
 
         public void stop() throws Exception {
         	pipe.write(EOF_TOKEN);
-            if (readContext != null) {
-                readContext.close(true);
+            if (dispatchQueue != null) {
+                RunnableCountDownLatch done = new RunnableCountDownLatch(1);
+                dispatchQueue.setFinalizer(done);
+                dispatchQueue.release();
+                done.await();
             } else {
                 stopping.set(true);
                 if( thread!=null ) {
@@ -76,16 +81,17 @@
         }
 
         public void setDispatcher(AdvancedDispatchSPI dispatcher) {
-            readContext = dispatcher.register(new Runnable() {
+            dispatchQueue = dispatcher.createQueue(name);
+            dispatchTask = new Runnable(){
                 public void run() {
                     dispatch();
                 }
-            }, name);
+            };
         }
 
         public void onReadReady(Pipe<Object> pipe) {
-            if (readContext != null) {
-                readContext.requestDispatch();
+            if (dispatchQueue != null) {
+                dispatchQueue.dispatchAsync(dispatchTask);
             }
         }
 
@@ -127,7 +133,7 @@
                         } else {
                             listener.onCommand(o);
                         }
-                        readContext.requestDispatch();
+                        dispatchQueue.dispatchAsync(dispatchTask);
                         return;
                     }
                 } catch (IOException e) {
@@ -222,7 +228,8 @@
         }
 
         public void setDispatchPriority(int priority) {
-            readContext.updatePriority(priority);
+//            TODO:
+//            readContext.updatePriority(priority);
         }
 
         public WireFormat getWireformat() {



Mime
View raw message