activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r752957 [1/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/flow/ main/java/org/apache/activemq/queue/ main/java/org/apache/activemq/transport/ test/java/org/apache/activemq/...
Date Thu, 12 Mar 2009 18:04:50 GMT
Author: chirino
Date: Thu Mar 12 18:04:49 2009
New Revision: 752957

URL: http://svn.apache.org/viewvc?rev=752957&view=rev
Log:
Applied https://issues.apache.org/activemq/browse/AMQ-2156 Thx Colin!

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java Thu Mar 12 18:04:49 2009
@@ -106,7 +106,7 @@
      */
     public void onDispatcherStarted(D d) {
         dispatcher.set(d);
-        loadBalancer.addDispatcher(d);
+        loadBalancer.onDispatcherStarted(d);
     }
 
     public ExecutionLoadBalancer<D> getLoadBalancer() {
@@ -122,7 +122,7 @@
                 size--;
             }
         }
-        loadBalancer.removeDispatcher(d);
+        loadBalancer.onDispatcherStopped(d);
     }
 
     protected D chooseDispatcher() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java Thu Mar 12 18:04:49 2009
@@ -16,23 +16,54 @@
  */
 package org.apache.activemq.dispatch;
 
+
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
 import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
 
 public interface ExecutionLoadBalancer<D extends IDispatcher> {
 
     public interface ExecutionTracker<D extends IDispatcher> {
+        
+        /**
+         * Should be called when a {@link DispatchContext#requestDispatch()} is called.
+         * This assists the load balancer in determining relationships between {@link DispatchContext}s
+         * @param caller The calling dispatcher
+         * @param context The context from which the dispatch is requested.
+         */
         public void onDispatchRequest(D caller, PooledDispatchContext<D> context);
 
+        /**
+         * Must be called by the dispatcher when a {@link DispatchContext} is closed.
+         */
         public void close();
     }
-
-    public void addDispatcher(D dispatcher);
-
-    public void removeDispatcher(D dispatcher);
-
+    
+    /**
+     * Must be called by a dispatch thread when it starts
+     * @param dispatcher The dispatcher
+     */
+    public void onDispatcherStarted(D dispatcher);
+
+    /**
+     * Must be called by a dispatch thread when it stops
+     * @param dispatcher The dispatcher
+     */
+    public void onDispatcherStopped(D dispatcher);
+
+    /**
+     * Gets an {@link ExecutionTracker} for the dispatch context. 
+     * @param context
+     * @return
+     */
     public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context);
 
+    /**
+     * Starts execution tracking
+     */
     public void start();
 
+    /**
+     * Stops execution tracking
+     */
     public void stop();
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java Thu Mar 12 18:04:49 2009
@@ -20,7 +20,7 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
-public interface IDispatcher {
+public interface IDispatcher extends Executor{
 
     /**
      * This interface is implemented by Dispatchable entities. A Dispatchable
@@ -115,6 +115,13 @@
     public DispatchContext register(Dispatchable dispatchable, String name);
 
     /**
+     * Gets the number of dispatch priorities. Dispatch priorities are 0 based, 
+     * so if the number of dispatch priorities is 10, the maxium is 9.
+     * @return the number of dispatch priorities.
+     */
+    public int getDispatchPriorities();
+    
+    /**
      * Creates an executor that will execute its tasks at the specified
      * priority.
      * 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Thu Mar 12 18:04:49 2009
@@ -39,7 +39,7 @@
     private boolean threaded = false;
     protected final int MAX_USER_PRIORITY;
     protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
-
+    
     // Set if this dispatcher is part of a dispatch pool:
     protected final PooledDispatcher<D> pooledDispatcher;
 
@@ -66,7 +66,7 @@
 
     protected PriorityDispatcher(String name, int priorities, PooledDispatcher<D> pooledDispactcher) {
         this.name = name;
-        MAX_USER_PRIORITY = priorities;
+        MAX_USER_PRIORITY = priorities - 1;
         priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
         foreignQueue = createForeignEventQueue();
         for (int i = 0; i < 2; i++) {
@@ -95,6 +95,15 @@
                     }
                 };
             }
+
+            public int getDispatchPriorities() {
+                // TODO Auto-generated method stub
+                return numPriorities;
+            }
+
+            public void execute(Runnable command) {
+                chooseDispatcher().dispatch(new RunnableAdapter(command), 0);
+            }
         };
     }
 
@@ -126,6 +135,10 @@
         this.threaded = threaded;
     }
 
+    public int getDispatchPriorities() {
+        return MAX_USER_PRIORITY;
+    }
+
     private class UpdateEvent extends ForeignEvent {
         private final PriorityDispatchContext pdc;
 
@@ -197,8 +210,12 @@
 
         // Inform the dispatcher that we have started:
         pooledDispatcher.onDispatcherStarted((D) this);
+
         PriorityDispatchContext pdc;
         try {
+            final int MAX_DISPATCH_PER_LOOP = 20;
+            int processed = 0;
+
             while (running) {
                 pdc = priorityQueue.poll();
                 // If no local work available wait for foreign work:
@@ -210,9 +227,9 @@
                     }
 
                     while (!pdc.dispatch()) {
-                        // If there is a higher priority dispatchable stop
-                        // processing this one:
-                        if (pdc.listPrio < priorityQueue.getHighestPriority()) {
+                        processed++;
+                        if (processed > MAX_DISPATCH_PER_LOOP || pdc.listPrio < priorityQueue.getHighestPriority()) {
+                            // Give other dispatchables a shot:
                             // May have gotten relinked by the caller:
                             if (!pdc.isLinked()) {
                                 priorityQueue.add(pdc, pdc.listPrio);
@@ -221,9 +238,16 @@
                         }
                     }
 
-                    pooledDispatcher.setCurrentDispatchContext(null);
+                    if (pdc.tracker != null) {
+                        pooledDispatcher.setCurrentDispatchContext(null);
+                    }
+
+                    if (processed < MAX_DISPATCH_PER_LOOP) {
+                        continue;
+                    }
                 }
 
+                processed = 0;
                 // Execute delayed events:
                 timerHeap.executeReadyTimers();
 
@@ -275,7 +299,12 @@
      * @throws Exception
      */
     protected void waitForEvents() throws Exception {
-        foreignPermits.acquire();
+        long next = timerHeap.timeToNext(TimeUnit.NANOSECONDS);
+        if (next == -1) {
+            foreignPermits.acquire();
+        } else if (next > 0) {
+            foreignPermits.tryAcquire(next, TimeUnit.NANOSECONDS);
+        }
     }
 
     /**
@@ -424,7 +453,6 @@
             this.dispatchable = dispatchable;
             this.name = name;
             this.currentOwner = (D) PriorityDispatcher.this;
-            this.currentOwner.contexts.add(this);
             if (persistent) {
                 this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext<D>) this);
             } else {
@@ -433,6 +461,7 @@
             updateEvent = createUpdateEvent();
             updateEvent[0] = new UpdateEvent(this);
             updateEvent[1] = new UpdateEvent(this);
+            currentOwner.takeOwnership(this);
         }
 
         @SuppressWarnings("unchecked")

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Thu Mar 12 18:04:49 2009
@@ -23,7 +23,7 @@
 
 public class SimpleLoadBalancer<D extends IDispatcher> implements ExecutionLoadBalancer<D> {
 
-    private final boolean DEBUG = false;
+    private final boolean DEBUG = true;
 
     public SimpleLoadBalancer() {
     }
@@ -44,11 +44,11 @@
         }
     }
 
-    public void addDispatcher(D dispatcher) {
+    public void onDispatcherStarted(D dispatcher) {
 
     }
 
-    public void removeDispatcher(D dispatcher) {
+    public void onDispatcherStopped(D dispatcher) {
 
     }
 
@@ -57,7 +57,7 @@
 
     public void stop() {
     }
-
+    
     public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context) {
         return new SimpleExecutionTracker<D>(context);
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java Thu Mar 12 18:04:49 2009
@@ -18,8 +18,10 @@
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.concurrent.Executor;
 
 public abstract class AbstractLimitedFlowResource<E> implements IFlowResource {
+    private Executor flowExecutor = null;
     private final HashSet<FlowLifeCycleListener> lifeCycleWatchers = new HashSet<FlowLifeCycleListener>();
     private final HashMap<Flow, IFlowController<E>> openControllers = new HashMap<Flow, IFlowController<E>>();
 
@@ -63,20 +65,22 @@
      *            The new controller.
      */
     protected synchronized final void onFlowOpened(IFlowController<E> controller) {
-    	IFlowController<E> existing = openControllers.put(controller.getFlow(), controller);
+        IFlowController<E> existing = openControllers.put(controller.getFlow(), controller);
         if (existing != null && existing != controller) {
             // Put the existing controller back:
             openControllers.put(controller.getFlow(), existing);
             throw new IllegalStateException("Flow already opened" + existing);
         }
-
+        if (flowExecutor != null) {
+            controller.setExecutor(flowExecutor);
+        }
         for (FlowLifeCycleListener listener : lifeCycleWatchers) {
             listener.onFlowOpened(this, controller.getFlow());
         }
     }
 
     protected synchronized final void onFlowClosed(Flow flow) {
-    	IFlowController<E> existing = openControllers.remove(flow);
+        IFlowController<E> existing = openControllers.remove(flow);
 
         if (existing != null) {
             for (FlowLifeCycleListener listener : lifeCycleWatchers) {
@@ -85,6 +89,15 @@
         }
     }
 
+    public synchronized final void setFlowExecutor(Executor executor) {
+        if (executor != flowExecutor) {
+            flowExecutor = executor;
+            for (IFlowController<E> controller : openControllers.values()) {
+                controller.setExecutor(flowExecutor);
+            }
+        }
+    }
+
     public synchronized void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
         lifeCycleWatchers.remove(listener);
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Thu Mar 12 18:04:49 2009
@@ -28,6 +28,7 @@
  */
 public class FlowController<E> implements IFlowController<E> {
 
+    private static final Executor DEFAULT_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
     // Sinks that are blocking us.
     private final HashSet<ISinkController<E>> blockingSinks = new HashSet<ISinkController<E>>();
 
@@ -77,6 +78,7 @@
     private boolean throttleReg;
     private boolean notifyUnblock = false;
     private String name;
+    private Executor executor = DEFAULT_EXECUTOR;
 
     public FlowController() {
         this.unthrottleListener = new UnThrottleListener() {
@@ -226,14 +228,16 @@
             if (okToAdd(elem)) {
                 ok = true;
                 if (limiter.add(elem)) {
-                    setUnThrottleListener();
                     blockSource(sourceController);
                 }
             } else {
                 // Add to overflow queue and block source:
                 overflowQueue.add(elem);
-                setUnThrottleListener();
-                blockSource(sourceController);
+                if (sourceController != null) {
+                    blockSource(sourceController);
+                } else if (!resuming) {
+                    setUnThrottleListener();
+                }
             }
         }
         if (ok) {
@@ -263,12 +267,10 @@
             if (okToAdd(elem)) {
                 if (limiter.add(elem)) {
                     blockSource(sourceController);
-                    setUnThrottleListener();
                 }
                 ok = true;
             } else {
                 blockSource(sourceController);
-                setUnThrottleListener();
             }
         }
         if (ok) {
@@ -313,10 +315,14 @@
             return;
         }
 
-        // If we are currently in the process of resuming we
-        // must wait for resume to complete, before we add to
-        // the blocked list:
-        waitForResume();
+        // TODO This could allow a single source to completely overflow the
+        // queue
+        // during resume
+        if (resuming) {
+            return;
+        }
+
+        setUnThrottleListener();
 
         if (!blockedSources.contains(source)) {
             // System.out.println("BLOCKING  : SINK[" + this + "], SOURCE[" +
@@ -361,8 +367,8 @@
      */
     public final boolean addUnblockListener(FlowUnblockListener<E> listener) {
         synchronized (mutex) {
-            waitForResume();
-            if (limiter.getThrottled() || !overflowQueue.isEmpty()) {
+            if (!resuming && (limiter.getThrottled() || !overflowQueue.isEmpty())) {
+                setUnThrottleListener();
                 unblockListeners.add(listener);
                 return true;
             }
@@ -370,21 +376,6 @@
         return false;
     }
 
-    private final void waitForResume() {
-        boolean interrupted = false;
-        while (resuming) {
-            try {
-                mutex.wait();
-            } catch (InterruptedException e) {
-                interrupted = true;
-            }
-        }
-
-        if (interrupted) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
     /**
      * Releases blocked sources providing the limiter isn't throttled, and there
      * are no elements on the blocked list.
@@ -424,7 +415,7 @@
             };
 
             try {
-                RESUME_SERVICE.execute(resume);
+                executor.execute(resume);
             } catch (RejectedExecutionException ree) {
                 // Must be shutting down, ignore this, leaving resumeScheduled
                 // true
@@ -432,12 +423,6 @@
         }
     }
 
-    private static Executor RESUME_SERVICE = Executors.newCachedThreadPool();
-
-    public static final void setFlowExecutor(Executor executor) {
-        RESUME_SERVICE = executor;
-    }
-
     public String toString() {
         return name;
     }
@@ -445,4 +430,11 @@
     public IFlowSink<E> getFlowSink() {
         return controllable.getFlowSink();
     }
+
+    public void setExecutor(Executor executor) {
+        synchronized (mutex) {
+            this.executor = executor;
+        }
+
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java Thu Mar 12 18:04:49 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.flow;
 
+import java.util.concurrent.Executor;
+
 public interface IFlowSink<E> extends IFlowResource {
     /**
      * Adds an element to the sink. If limiter space in the sink is overflowed
@@ -39,4 +41,15 @@
      * @return false if the element wasn't accepted.
      */
     public boolean offer(E elem, ISourceController<E> source);
+
+    /**
+     * Sets the executor to be used by the sink's {@link IFlowController}s.
+     * Implementors must set the provided executor for all of the controllers
+     * that it has open.
+     * 
+     * @param executor
+     *            The executor.
+     * @see ISinkController#setExecutor(Executor)
+     */
+    public void setFlowExecutor(Executor executor);
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java Thu Mar 12 18:04:49 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.flow;
 
+import java.util.concurrent.Executor;
+
 public interface ISinkController<E> {
     /**
      * Defines required attributes for an entity that can be flow controlled.
@@ -90,6 +92,19 @@
      */
     public boolean addUnblockListener(FlowUnblockListener<E> listener);
 
+    /**
+     * Gets the {@link IFlowSink} that this controller is controlling. 
+     * @return The {@link IFlowSink} that this controller is controlling.
+     */
     public IFlowSink<E> getFlowSink();
+    
+    /**
+     * Sets the executor for this {@link ISinkController}. The executor is
+     * used to resume sources blocked by this controller. An exeuctor must be
+     * set prior to using the controller.
+     * 
+     * @param executor The executor.
+     */
+    public void setExecutor(Executor executor);
 
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java Thu Mar 12 18:04:49 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.flow;
 
+import java.util.concurrent.Executor;
 
 public class NoOpFlowController<E> implements ISinkController<E> {
     private final IFlowSource<E> source;
@@ -89,4 +90,8 @@
     public IFlowSink<E> getFlowSink() {
         return null;
     }
+
+    public void setExecutor(Executor executor) {
+        // Don't need an executor since we don't block.
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java Thu Mar 12 18:04:49 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.flow;
 
 import java.util.ArrayList;
+import java.util.concurrent.Executor;
 
 public class PriorityFlowController<E> implements IFlowController<E> {
 
@@ -31,7 +32,7 @@
         this.controllable = controllable;
         this.flow = flow;
         this.mutex = mutex;
-        this.limiter =  limiter;
+        this.limiter = limiter;
         this.controllers = new ArrayList<FlowController<E>>(limiter.getPriorities());
         for (int i = 0; i < limiter.getPriorities(); i++) {
             controllers.add(new FlowController<E>(controllable, flow, limiter.getPriorityLimter(i), mutex));
@@ -69,7 +70,7 @@
     public void waitForFlowUnblock() throws InterruptedException {
         throw new UnsupportedOperationException();
     }
-    
+
     // /////////////////////////////////////////////////////////////////
     // ISourceController interface impl.
     // /////////////////////////////////////////////////////////////////
@@ -104,6 +105,12 @@
         return false;
     }
 
+    public void setExecutor(Executor executor) {
+        for (IFlowController<E> controller : controllers) {
+            controller.setExecutor(executor);
+        }
+    }
+
     // /////////////////////////////////////////////////////////////////
     // Getters and Setters
     // /////////////////////////////////////////////////////////////////
@@ -111,4 +118,5 @@
     public IFlowSink<E> getFlowSink() {
         return controllable.getFlowSink();
     }
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Thu Mar 12 18:04:49 2009
@@ -52,12 +52,10 @@
 
     public final boolean dispatch() {
 
-        while (pollingDispatch())
-            ;
+        // while (pollingDispatch());
+        // return true;
 
-        return true;
-
-        // return !pollingDispatch();
+        return !pollingDispatch();
     }
 
     public final IFlowSink<E> getFlowSink() {
@@ -87,6 +85,7 @@
         this.dispatcher = dispatcher;
         dispatchContext = dispatcher.register(this, getResourceName());
         dispatchContext.updatePriority(dispatchPriority);
+        super.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
     }
 
     public synchronized final void setDispatchPriority(int priority) {
@@ -125,13 +124,6 @@
         }
 
         synchronized (this) {
-            if (dispatchContext != null) {
-                if (!dispatching) {
-                    dispatching = true;
-                    dispatchContext.requestDispatch();
-                }
-                return;
-            }
 
             if (notifyReady) {
                 notify();

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java Thu Mar 12 18:04:49 2009
@@ -90,22 +90,28 @@
     }
 
     public boolean pollingDispatch() {
-        PriorityNode node = null;
+        E elem = poll();
+        if (elem != null) {
+            drain.drain(elem, controller);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public final E poll() {
         synchronized (this) {
-            node = queue.poll();
+            PriorityNode node = queue.poll();
             // FIXME the release should really be done after dispatch.
             // doing it here saves us from having to resynchronize
             // after dispatch, but release limiter space too soon.
-            if (autoRelease && node != null) {
-                controller.elementDispatched(node.elem);
+            if (node != null) {
+                if (autoRelease) {
+                    controller.elementDispatched(node.elem);
+                }
+                return node.elem;
             }
-        }
-
-        if (node != null) {
-            drain.drain(node.elem, controller);
-            return true;
-        } else {
-            return false;
+            return null;
         }
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java Thu Mar 12 18:04:49 2009
@@ -69,16 +69,7 @@
     }
 
     public final boolean pollingDispatch() {
-        E elem = null;
-        synchronized (this) {
-            elem = queue.poll();
-            // FIXME the release should really be done after dispatch.
-            // doing it here saves us from having to resynchronize
-            // after dispatch, but release limiter space too soon.
-            if (autoRelease && elem != null) {
-                controller.elementDispatched(elem);
-            }
-        }
+        E elem = poll();
 
         if (elem != null) {
             drain.drain(elem, controller);
@@ -88,6 +79,22 @@
         }
     }
 
+    public final E poll() {
+        synchronized (this) {
+            E elem = queue.poll();
+            // FIXME the release should really be done after dispatch.
+            // doing it here saves us from having to resynchronize
+            // after dispatch, but release limiter space too soon.
+            if (elem != null) {
+                if (autoRelease) {
+                    controller.elementDispatched(elem);
+                }
+                return elem;
+            }
+            return null;
+        }
+    }
+
     @Override
     public String toString() {
         return "SingleFlowQueue:" + getResourceName();

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java Thu Mar 12 18:04:49 2009
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.queue;
 
-import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowRelay;
 
-public interface IFlowQueue<E> extends IBlockingFlowSource<E>, IPollableFlowSource<E>, IAsynchronousFlowSource<E>, IFlowSink<E> {
+public interface IFlowQueue<E> extends IBlockingFlowSource<E>, IPollableFlowSource<E>, IAsynchronousFlowSource<E>, IFlowRelay<E> {
 
     public void setDispatchPriority(int priority);
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java Thu Mar 12 18:04:49 2009
@@ -52,4 +52,11 @@
      */
     public boolean isDispatchReady();
 
+    /**
+     * Polls for the next element
+     * 
+     * @return The next element or null if none are ready
+     */
+    public E poll();
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java Thu Mar 12 18:04:49 2009
@@ -198,24 +198,29 @@
             sink = readyConsumers.getHead().sink;
 
             // Get the next elem:
-            elem = queue.poll();
-            if (elem == null) {
-                return false;
-            }
+            elem = poll();
 
             readyConsumers.rotate();
+        }
+
+        sink.add(elem, sourceControler);
+        return true;
+    }
 
+    public final E poll() {
+        synchronized (this) {
+            E elem = queue.poll();
             // FIXME the release should really be done after dispatch.
             // doing it here saves us from having to resynchronize
-            // after dispatch, but releases limiter space too soon.
-            if (autoRelease) {
-                sinkController.elementDispatched(elem);
+            // after dispatch, but release limiter space too soon.
+            if (elem != null) {
+                if (autoRelease) {
+                    sinkController.elementDispatched(elem);
+                }
+                return elem;
             }
-
+            return null;
         }
-
-        sink.add(elem, sourceControler);
-        return true;
     }
 
     public final void addSink(IFlowSink<E> sink) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java Thu Mar 12 18:04:49 2009
@@ -86,6 +86,28 @@
         return true;
     }
 
+    public final E poll() {
+        synchronized (this) {
+            synchronized (this) {
+                SingleFlowQueue queue = peekReadyQueue();
+                if (queue == null) {
+                    return null;
+                }
+
+                E elem = queue.poll();
+                if (elem == null) {
+
+                    unreadyQueue(queue);
+                    return null;
+                }
+
+                // rotate to have fair dispatch.
+                queue.getList().rotate();
+                return elem;
+            }
+        }
+    }
+
     public final boolean isDispatchReady() {
         return !readyQueues.isEmpty();
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Thu Mar 12 18:04:49 2009
@@ -75,15 +75,15 @@
                     boolean notify = false;
                     if (node.cursor == null) {
                         readyDirectSubs.addLast(node);
-//                         System.out.println("Subscription state change: un-ready direct -> ready direct: "+node);
+                        // System.out.println("Subscription state change: un-ready direct -> ready direct: "+node);
                     } else {
                         if (readyPollingSubs.isEmpty()) {
                             notify = !store.isEmpty();
                         }
                         readyPollingSubs.addLast(node);
-//                         System.out.println("Subscription state change: un-ready polling -> ready polling: "+node);
+                        // System.out.println("Subscription state change: un-ready polling -> ready polling: "+node);
                     }
-                    
+
                     if (notify) {
                         notifyReady();
                     }
@@ -170,7 +170,7 @@
                         sub.resumeAt(node);
                         unreadyPollingSubs.addLast(sub);
                         matchCount++;
-//                         System.out.println("Subscription state change: un-ready direct -> un-ready polling: "+sub);
+                        // System.out.println("Subscription state change: un-ready direct -> un-ready polling: "+sub);
                     }
                     sub = next;
                 }
@@ -182,7 +182,7 @@
                     subNode.unlink();
                     subNode.resumeAt(node);
                     unreadyPollingSubs.addLast(subNode);
-//                     System.out.println("Subscription state change: ready direct -> un-ready polling: "+subNode);
+                    // System.out.println("Subscription state change: ready direct -> un-ready polling: "+subNode);
                 }
                 matchCount += matches.size();
 
@@ -237,8 +237,8 @@
 
     public boolean pollingDispatch() {
 
-//        System.out.println("polling dispatch");
-        
+        // System.out.println("polling dispatch");
+
         // Keep looping until we can find one subscription that we can
         // dispatch a message to.
         while (true) {
@@ -264,7 +264,7 @@
                     } else {
                         // Cursor dried up... this subscriber can now be direct
                         // dispatched.
-//                        System.out.println("Subscription state change: ready polling -> ready direct: "+subNode);
+                        // System.out.println("Subscription state change: ready polling -> ready direct: "+subNode);
                         subNode.unlink();
                         readyDirectSubs.addLast(subNode);
                     }
@@ -294,7 +294,7 @@
                     }
                     return true;
                 } else {
-//                     System.out.println("Subscription state change: ready polling -> un-ready polling: "+subNode);
+                    // System.out.println("Subscription state change: ready polling -> un-ready polling: "+subNode);
                     // Subscription is no longer ready..
                     subNode.cursorUnPeek(storeNode);
                     subNode.unlink();
@@ -304,6 +304,10 @@
         }
     }
 
+    public final V poll() {
+        throw new UnsupportedOperationException("Not supported");
+    }
+
     public void addSubscription(Subscription<V> subscription) {
         synchronized (mutex) {
             SubscriptionNode node = subscriptions.get(subscription);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java Thu Mar 12 18:04:49 2009
@@ -88,23 +88,28 @@
     }
 
     public boolean pollingDispatch() {
-        PriorityNode elem = null;
+        E elem = poll();
+        if (elem != null) {
+            drain.drain(elem, controller);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public final E poll() {
         synchronized (this) {
-            elem = queue.poll();
-            updatePriority();
+            PriorityNode node = queue.poll();
             // FIXME the release should really be done after dispatch.
             // doing it here saves us from having to resynchronize
             // after dispatch, but release limiter space too soon.
-            if (autoRelease && elem != null) {
-                controller.elementDispatched(elem.elem);
+            if (node != null) {
+                if (autoRelease) {
+                    controller.elementDispatched(node.elem);
+                }
+                return node.elem;
             }
-        }
-
-        if (elem != null) {
-            drain.drain(elem.elem, controller);
-            return true;
-        } else {
-            return false;
+            return null;
         }
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java Thu Mar 12 18:04:49 2009
@@ -5,49 +5,48 @@
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowRelay;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.IFlowSource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 
-public class SingleFlowRelay<E> extends AbstractLimitedFlowSource<E> implements
-		IFlowSink<E>, IFlowSource<E>, FlowControllable<E> {
+public class SingleFlowRelay<E> extends AbstractLimitedFlowSource<E> implements IFlowRelay<E>, FlowControllable<E> {
 
-	private final IFlowController<E> controller;
+    private final IFlowController<E> controller;
 
-	public SingleFlowRelay(Flow flow, String name, IFlowLimiter<E> limiter) {
-		super(name);
-		FlowController<E> c = new FlowController<E>(this, flow, limiter, this);
-		c.useOverFlowQueue(false);
-		controller = c;
-		super.onFlowOpened(controller);
-	}
-
-	public void add(E elem, ISourceController<E> source) {
-		controller.add(elem, source);
-
-	}
-
-	public boolean offer(E elem, ISourceController<E> source) {
-		return controller.offer(elem, source);
-	}
-
-	public void flowElemAccepted(ISourceController<E> controller, E elem) {
-		drain.drain(elem, controller);
-	}
-
-	public IFlowSink<E> getFlowSink() {
-		// TODO Auto-generated method stub
-		return this;
-	}
-
-	public IFlowSource<E> getFlowSource() {
-		// TODO Auto-generated method stub
-		return this;
-	}
-	
-	@Override
-	public String toString() {
-	    return getResourceName();
-	}
+    public SingleFlowRelay(Flow flow, String name, IFlowLimiter<E> limiter) {
+        super(name);
+        FlowController<E> c = new FlowController<E>(this, flow, limiter, this);
+        c.useOverFlowQueue(false);
+        controller = c;
+        super.onFlowOpened(controller);
+    }
+
+    public void add(E elem, ISourceController<E> source) {
+        controller.add(elem, source);
+    }
+
+    public boolean offer(E elem, ISourceController<E> source) {
+        return controller.offer(elem, source);
+    }
+
+    public void flowElemAccepted(ISourceController<E> controller, E elem) {
+        drain.drain(elem, controller);
+    }
+
+    public IFlowSink<E> getFlowSink() {
+        // TODO Auto-generated method stub
+        return this;
+    }
+
+    public IFlowSource<E> getFlowSource() {
+        // TODO Auto-generated method stub
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return getResourceName();
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransport.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransport.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/transport/DispatchableTransport.java Thu Mar 12 18:04:49 2009
@@ -2,8 +2,11 @@
 
 import org.apache.activemq.dispatch.IDispatcher;
 
-public interface DispatchableTransport extends Transport{
+public interface DispatchableTransport extends Transport {
 
-	public void setDispatcher(IDispatcher dispatcher);
-	public void setName(String name);
+    public void setDispatcher(IDispatcher dispatcher);
+
+    public void setDispatchPriority(int priority);
+
+    public void setName(String name);
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java Thu Mar 12 18:04:49 2009
@@ -0,0 +1,649 @@
+package org.apache.activemq.flow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
+import org.apache.activemq.flow.MockBroker.DeliveryTarget;
+import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.IPollableFlowSource;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.queue.IPollableFlowSource.FlowReadyListener;
+import org.apache.activemq.transport.AsyncTransport;
+import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.AsyncTransport.CommandSource;
+
+public abstract class AbstractTestConnection implements TransportListener, DeliveryTarget {
+
+    private static final boolean USE_RATE_BASED_LIMITER = false;
+    protected static final boolean USE_OUTPUT_QUEUE = true;
+    private static final boolean USE_ASYNC_COMMAND_QUEUE = true || USE_OUTPUT_QUEUE;
+
+    protected static final RateBasedLimiterCollector RATE_BASED_LIMITER_COLLECTOR = new RateBasedLimiterCollector();
+    protected static final AtomicBoolean inShutdown = new AtomicBoolean(true);
+
+    protected Transport transport;
+    protected AsyncTransport asyncTransport;
+    protected AsyncCommandQueue asyncCommandQueue;
+
+    protected boolean useInputQueue = false;
+    protected IFlowRelay<Message> inputQueue;
+
+    protected Flow outboundFlow;
+    protected IFlowRelay<Message> outputQueue;
+    protected IPollableFlowSource<Message> outputSource;
+    protected ProtocolLimiter<Message> outboundLimiter;
+
+    protected String name;
+
+    private int priorityLevels;
+
+    private final int outputWindowSize = 1000;
+    private final int outputResumeThreshold = 900;
+
+    private final int inputWindowSize = 1000;
+    private final int inputResumeThreshold = 500;
+
+    private IDispatcher dispatcher;
+    private final AtomicBoolean stopping = new AtomicBoolean(false);
+    protected boolean blockingTransport = false;
+    ExecutorService blockingWriter;
+
+    public static void setInShutdown(boolean val, IDispatcher dispatcher) {
+        if (val != inShutdown.getAndSet(val)) {
+            if (val) {
+                if (USE_RATE_BASED_LIMITER) {
+                    RATE_BASED_LIMITER_COLLECTOR.shutdown();
+                }
+            } else {
+                if (USE_RATE_BASED_LIMITER) {
+                    RATE_BASED_LIMITER_COLLECTOR.setDispatcher(dispatcher);
+                }
+            }
+        }
+    }
+
+    public void setUseInputQueue(boolean useInputQueue) {
+        this.useInputQueue = useInputQueue;
+    }
+
+    public boolean isUseInputQueue() {
+        return useInputQueue;
+    }
+
+    public void setTransport(Transport transport) {
+        this.transport = transport;
+    }
+
+    public void start() throws Exception {
+        transport.setTransportListener(this);
+        if (transport instanceof DispatchableTransport) {
+            DispatchableTransport dt = ((DispatchableTransport) transport);
+            if (name != null) {
+                dt.setName(name + "-transport");
+            }
+            dt.setDispatcher(getDispatcher());
+            dt.setDispatchPriority(dispatcher.getDispatchPriorities() - 1);
+        }
+        transport.start();
+    }
+
+    public void stop() throws Exception {
+        stopping.set(true);
+        if (transport != null) {
+            transport.stop();
+        }
+        if (outboundLimiter != null) {
+            outboundLimiter.shutdown();
+        }
+        if (blockingWriter != null) {
+            blockingWriter.shutdown();
+        }
+    }
+
+    public void onCommand(Object command) {
+        try {
+            if (command.getClass() == Message.class) {
+                Message msg = (Message) command;
+                inputQueue.add(msg, null);
+            } else if (command.getClass() == FlowControlBuffer.class) {
+                // This is a subscription request
+                FlowControl fc = (FlowControl) command;
+                outboundLimiter.onProtocolMessage(fc);
+            } else {
+                onException(new Exception("Unrecognized command: " + command));
+            }
+        } catch (Exception e) {
+            onException(e);
+        }
+    }
+
+    protected void initialize() {
+        // Setup the input processing..
+        Flow flow = new Flow(name, false);
+        ProtocolLimiter<Message> limiter = createProtocolLimiter(false, flow, inputWindowSize, inputResumeThreshold);
+
+        if (!useInputQueue) {
+            inputQueue = new SingleFlowRelay<Message>(flow, name + "-inbound", limiter);
+        } else {
+            ExclusiveQueue<Message> queue = new ExclusiveQueue<Message>(flow, name + "-inbound", limiter);
+            queue.setDispatchPriority(0);
+            queue.setDispatcher(dispatcher);
+            queue.getFlowController(flow).useOverFlowQueue(false);
+            inputQueue = queue;
+        }
+        inputQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+        inputQueue.setDrain(new IFlowDrain<Message>() {
+
+            public void drain(Message message, ISourceController<Message> controller) {
+                messageReceived(controller, message);
+            }
+        });
+
+        if (USE_ASYNC_COMMAND_QUEUE) {
+            asyncTransport = transport.narrow(AsyncTransport.class);
+            if (asyncTransport != null) {
+                asyncCommandQueue = new AsyncCommandQueue();
+                asyncTransport.setCommandSource(asyncCommandQueue);
+            }
+        }
+
+        outboundFlow = new Flow(name, false);
+        outboundLimiter = createProtocolLimiter(true, outboundFlow, outputWindowSize, outputResumeThreshold);
+
+        if (transport.narrow(DispatchableTransport.class) == null) {
+            blockingTransport = false;
+            blockingWriter = Executors.newSingleThreadExecutor();
+        }
+
+        if (!USE_OUTPUT_QUEUE || asyncTransport == null || blockingTransport) {
+            outputQueue = new SingleFlowRelay<Message>(outboundFlow, name + "-outbound", outboundLimiter);
+            outputQueue.setDrain(new IFlowDrain<Message>() {
+                public void drain(final Message message, ISourceController<Message> controller) {
+                    write(message);
+                };
+            });
+        } else {
+            ExclusiveQueue<Message> queue = new ExclusiveQueue<Message>(outboundFlow, name + "-outbound", outboundLimiter);
+            outputQueue = queue;
+            outputSource = queue;
+            queue.addFlowReadyListener(asyncCommandQueue);
+        }
+        // Set the executor to be used by the queue's flow controllers:
+        outputQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+
+        limiter.start();
+        outboundLimiter.start();
+
+    }
+
+    protected final ProtocolLimiter<Message> createProtocolLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
+        if (USE_RATE_BASED_LIMITER) {
+            return new RateBasedLimiter(clientMode, flow);
+        } else {
+            return new WindowLimiter<Message>(clientMode, flow, capacity, resumeThreshold);
+        }
+    }
+
+    protected final void write(final Object o) {
+
+        if (asyncTransport != null) {
+            asyncCommandQueue.addCommand(o);
+            return;
+        }
+        synchronized (outputQueue) {
+            if (!blockingTransport) {
+                try {
+                    transport.oneway(o);
+                } catch (IOException e) {
+                    onException(e);
+                }
+            } else {
+                try {
+                    blockingWriter.execute(new Runnable() {
+                        public void run() {
+                            if (!stopping.get()) {
+                                try {
+                                    transport.oneway(o);
+                                } catch (IOException e) {
+                                    onException(e);
+                                }
+                            }
+                        }
+                    });
+                } catch (RejectedExecutionException re) {
+                    // Must be shutting down.
+                }
+            }
+        }
+    }
+
+    protected abstract void messageReceived(ISourceController<Message> controller, Message elem);
+
+    public void onException(IOException error) {
+        onException((Exception) error);
+    }
+
+    public void onException(Exception error) {
+        if (!(stopping.get() || inShutdown.get())) {
+            System.out.println(transport.toString() + " - Connection error: " + error);
+            error.printStackTrace();
+        }
+    }
+
+    public void transportInterupted() {
+    }
+
+    public void transportResumed() {
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public int getPriorityLevels() {
+        return priorityLevels;
+    }
+
+    public void setPriorityLevels(int priorityLevels) {
+        this.priorityLevels = priorityLevels;
+    }
+
+    public IDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public int getOutputWindowSize() {
+        return outputWindowSize;
+    }
+
+    public int getOutputResumeThreshold() {
+        return outputResumeThreshold;
+    }
+
+    public int getInputWindowSize() {
+        return inputWindowSize;
+    }
+
+    public int getInputResumeThreshold() {
+        return inputResumeThreshold;
+    }
+
+    public IFlowSink<Message> getSink() {
+        return outputQueue;
+    }
+
+    public boolean match(Message message) {
+        return true;
+    }
+
+    private void sendFlowControl(int available) {
+        if (asyncTransport == null) {
+            FlowControlBean fc = new FlowControlBean();
+            fc.setCredit(available);
+            write(fc.freeze());
+        } else {
+            asyncCommandQueue.addFlowControlReady(available);
+        }
+    }
+
+    protected interface ProtocolLimiter<E> extends IFlowLimiter<E> {
+        public void onProtocolMessage(FlowControl m);
+
+        public void start();
+
+        public void shutdown();
+    }
+
+    private class AsyncCommandQueue implements CommandSource, FlowReadyListener<Message> {
+        private final LinkedList<Object> outputCommandQueue = new LinkedList<Object>();
+        private boolean needsNotify = true;
+        private int available = 0;
+        private boolean delayable = false;
+        private boolean registered = false;
+
+        public Object pollNextCommand() {
+            synchronized (outputQueue) {
+                delayable = false;
+                if (available > 0) {
+                    FlowControlBean fc = new FlowControlBean();
+                    fc.setCredit(available);
+                    available = 0;
+                    return fc.freeze();
+                }
+
+                delayable = !outboundLimiter.getThrottled();
+                Object command = null;
+
+                if (!outputCommandQueue.isEmpty()) {
+                    command = outputCommandQueue.removeFirst();
+                    if (command instanceof Message) {
+                        delayable = !outboundLimiter.getThrottled();
+                    } else {
+                        delayable = false;
+                    }
+                    return command;
+                }
+
+                if (outputSource != null) {
+                    command = outputSource.poll();
+                    if (command != null) {
+                        return command;
+                    } else if (!registered) {
+                        registered = true;
+                        outputSource.addFlowReadyListener(this);
+                    }
+                }
+
+                needsNotify = true;
+                return null;
+            }
+        }
+
+        public boolean delayable() {
+            return delayable;
+        }
+
+        public void addCommand(Object object) {
+            boolean notify = false;
+            synchronized (outputQueue) {
+                outputCommandQueue.add(object);
+                notifyTransport();
+            }
+        }
+
+        public void addFlowControlReady(int available) {
+            synchronized (outputQueue) {
+                this.available += available;
+                notifyTransport();
+            }
+        }
+
+        public final void notifyTransport() {
+            if (true) {
+                asyncTransport.onCommandReady();
+            } else if (needsNotify) {
+                needsNotify = false;
+                asyncTransport.onCommandReady();
+            }
+        }
+
+        public void onFlowReady(IPollableFlowSource<Message> source) {
+            synchronized (outputQueue) {
+                registered = false;
+                notifyTransport();
+            }
+        }
+
+    }
+
+    protected class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter<E> {
+        final Flow flow;
+        final boolean clientMode;
+        private int available;
+
+        public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
+            super(capacity, resumeThreshold);
+            this.clientMode = clientMode;
+            this.flow = flow;
+        }
+
+        protected void remove(int size) {
+            super.remove(size);
+            if (!clientMode) {
+                available += size;
+                if (available >= capacity - resumeThreshold) {
+                    sendFlowControl(available);
+                    // System.out.println(RemoteConnection.this.name +
+                    // " Send Release " + available + this);
+                    available = 0;
+                }
+            }
+        }
+
+        public void onProtocolMessage(FlowControl m) {
+            synchronized (outputQueue) {
+                remove(m.getCredit());
+            }
+        }
+
+        public int getElementSize(Message m) {
+            return m.getFlowLimiterSize();
+        }
+
+        public void shutdown() {
+            // Noop
+        }
+
+        public void start() {
+            // Noop
+        }
+    }
+
+    protected static class RateBasedLimiterCollector implements Runnable {
+
+        private IDispatcher dispatcher;
+        private int samplingPeriod = 50;
+        private boolean scheduled = false;
+        private HashSet<RateBasedLimiter> limiters = new HashSet<RateBasedLimiter>();
+
+        public synchronized void setDispatcher(IDispatcher d) {
+            if (d != dispatcher) {
+                scheduled = false;
+                dispatcher = d;
+            }
+
+            dispatcher = d;
+            scheduleNext();
+        }
+
+        public synchronized void shutdown() {
+            limiters.clear();
+        }
+
+        public synchronized void addLimiter(RateBasedLimiter limiter) {
+            if (limiters.isEmpty()) {
+                limiters.add(limiter);
+                scheduleNext();
+            } else {
+                limiters.add(limiter);
+            }
+        }
+
+        public synchronized void removeLimiter(RateBasedLimiter limiter) {
+            limiters.remove(limiter);
+        }
+
+        public void run() {
+
+            ArrayList<RateBasedLimiter> toCollect = null;
+
+            synchronized (this) {
+                if (!limiters.isEmpty()) {
+                    toCollect = new ArrayList<RateBasedLimiter>(limiters.size());
+                    toCollect.addAll(limiters);
+                }
+            }
+
+            if (toCollect != null) {
+                for (RateBasedLimiter limiter : toCollect) {
+                    limiter.update();
+                }
+            }
+
+            synchronized (this) {
+                scheduled = false;
+                scheduleNext();
+            }
+        }
+
+        private void scheduleNext() {
+            synchronized (this) {
+                if (dispatcher == null) {
+                    return;
+                }
+                if (!scheduled && !limiters.isEmpty()) {
+                    scheduled = true;
+                    dispatcher.schedule(this, samplingPeriod, TimeUnit.MILLISECONDS);
+                }
+            }
+        }
+    }
+
+    protected class RateBasedLimiter extends AbstractLimiter<Message> implements ProtocolLimiter<Message> {
+        final Flow flow;
+        final boolean clientMode;
+        private boolean throttled = false;
+
+        private long counter = 0;
+        private int limit = 0;
+
+        private float targetRate = 200000;
+        private final int quantum = 10000;
+        private final float lambda = .0001f;
+        boolean increase = true;
+
+        private static final boolean DEBUG = false;
+
+        public RateBasedLimiter(boolean clientMode, Flow flow) {
+            this.clientMode = clientMode;
+            this.flow = flow;
+            limit = (int) (targetRate * RATE_BASED_LIMITER_COLLECTOR.samplingPeriod * .0001f);
+        }
+
+        public void start() {
+            if (clientMode) {
+                RATE_BASED_LIMITER_COLLECTOR.addLimiter(this);
+            }
+        }
+
+        public void shutdown() {
+            if (clientMode) {
+                RATE_BASED_LIMITER_COLLECTOR.removeLimiter(this);
+            }
+        }
+
+        public void onProtocolMessage(FlowControl m) {
+            synchronized (outputQueue) {
+                if (m.getCredit() == 1) {
+                    abate();
+                } else {
+                    congest();
+                }
+            }
+        }
+
+        private void congest() {
+            targetRate -= quantum;
+            increase = false;
+            throttled = true;
+            // if (DEBUG)
+            debug("congest");
+        }
+
+        private void abate() {
+
+            /*
+             * if (throttled) { throttled = false;
+             * super.notifyUnThrottleListeners(); }
+             */
+
+            // if (DEBUG)
+            debug("abate");
+
+        }
+
+        public void update() {
+            synchronized (outputQueue) {
+
+                if (DEBUG)
+                    debug("Update");
+                counter = 0;
+
+                if (increase) {
+                    targetRate += quantum;
+                } else {
+                    increase = true;
+                }
+
+                limit = (int) (targetRate * RATE_BASED_LIMITER_COLLECTOR.samplingPeriod * .0001f);
+
+                if (throttled) {
+                    throttled = false;
+                    notifyUnThrottleListeners();
+                }
+            }
+        }
+
+        public boolean add(Message elem) {
+            if (clientMode) {
+                counter += elem.getFlowLimiterSize();
+                if (counter >= limit) {
+                    throttled = true;
+                }
+                if (DEBUG)
+                    debug("Add");
+
+            }
+            return throttled;
+        }
+
+        public boolean canAdd(Message elem) {
+            return !throttled;
+        }
+
+        public boolean getThrottled() {
+            return throttled;
+        }
+
+        public void remove(Message elem) {
+            // Noop
+        }
+
+        public void releaseReserved() {
+            if (!clientMode) {
+                // Send abate
+                if (throttled) {
+                    notifyUnThrottleListeners();
+                    throttled = false;
+                    sendFlowControl(1);
+                }
+            }
+        }
+
+        public void reserve(Message elem) {
+            if (!clientMode) {
+                // Send congest:
+                if (!throttled) {
+                    throttled = true;
+                    sendFlowControl(0);
+                }
+            }
+        }
+
+        private void debug(String str) {
+            System.out.println(AbstractTestConnection.this.name + " " + str + " count/limit/throttled: " + counter + "/" + limit + "/" + throttled);
+        }
+    }
+
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java Thu Mar 12 18:04:49 2009
@@ -1,5 +1,51 @@
 package org.apache.activemq.flow;
 
-public class BrokerConnection extends RemoteConnection {
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
 
+public class BrokerConnection extends AbstractTestConnection {
+
+    private MockBroker broker;
+
+    public void onCommand(Object command) {
+        try {
+            // System.out.println("Got Command: " + command);
+            // First command in should be the name of the connection
+            if (name == null) {
+                name = "broker-" + (String) command;
+                initialize();
+            } else if (command.getClass() == Message.class) {
+                Message msg = (Message) command;
+                inputQueue.add(msg, null);
+            } else if (command.getClass() == DestinationBuffer.class) {
+                // This is a subscription request
+                Destination destination = (Destination) command;
+
+                broker.subscribe(destination, this);
+            } else if (command.getClass() == FlowControlBuffer.class) {
+                // This is a subscription request
+                FlowControl fc = (FlowControl) command;
+                outboundLimiter.onProtocolMessage(fc);
+            } else {
+                onException(new Exception("Unrecognized command: " + command));
+            }
+        } catch (Exception e) {
+            onException(e);
+        }
+    }
+
+    protected final void messageReceived(ISourceController<Message> controller, Message elem) {
+        broker.router.route(controller, elem);
+        controller.elementDispatched(elem);
+    }
+
+    public void setBroker(MockBroker broker) {
+        this.broker = broker;
+    }
+
+    public MockBroker getBroker() {
+        return broker;
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=752957&r1=752956&r2=752957&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java Thu Mar 12 18:04:49 2009
@@ -22,6 +22,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.transport.DispatchableTransportServer;
 import org.apache.activemq.transport.Transport;
@@ -29,7 +30,7 @@
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
 
-class MockBroker implements TransportAcceptListener {
+public class MockBroker implements TransportAcceptListener {
 
     public interface DeliveryTarget {
         public IFlowSink<Message> getSink();
@@ -39,17 +40,24 @@
 
     final Router router = new Router();
 
-    final ArrayList<RemoteConnection> connections = new ArrayList<RemoteConnection>();
-    final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
-    final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
+    final ArrayList<BrokerConnection> connections = new ArrayList<BrokerConnection>();
     final ArrayList<BrokerConnection> brokerConnections = new ArrayList<BrokerConnection>();
     final HashMap<Destination, MockQueue> queues = new HashMap<Destination, MockQueue>();
 
     private TransportServer transportServer;
     private String uri;
     private String name;
-    private IDispatcher dispatcher;
+    protected IDispatcher dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
+    private boolean useInputQueues = false;
+
+    public boolean isUseInputQueues() {
+        return useInputQueues;
+    }
+
+    public void setUseInputQueues(boolean useInputQueues) {
+        this.useInputQueues = useInputQueues;
+    }
 
     public String getName() {
         return name;
@@ -68,35 +76,34 @@
         queues.put(queue.getDestination(), queue);
     }
 
-//    public void createClusterConnection(Destination destination) {
-//        RemoteConsumer c = new RemoteConsumer(this.mockBrokerTest, "consumer" + ++consumerCounter, this, destination);
-//        consumers.add(c);
-//        router.bind(c, destination);
-//    }
-//    public void createBrokerConnection(MockBroker target, Pipe<Message> pipe) {
-//        BrokerConnection bc = this.mockBrokerTest.new BrokerConnection(this, target, pipe);
-//        // Set up the pipe for polled access
-//        if (dispatchMode != AbstractTestConnection.BLOCKING) {
-//            pipe.setMode(Pipe.POLLING);
-//        }
-//        // Add subscriptions for the target's destinations:
-//        for (Destination d : target.router.lookupTable.keySet()) {
-//            router.bind(bc, d);
-//        }
-//        brokerConns.add(bc);
-//    }
+    // public void createClusterConnection(Destination destination) {
+    // RemoteConsumer c = new RemoteConsumer(this.mockBrokerTest, "consumer" +
+    // ++consumerCounter, this, destination);
+    // consumers.add(c);
+    // router.bind(c, destination);
+    // }
+    // public void createBrokerConnection(MockBroker target, Pipe<Message> pipe)
+    // {
+    // BrokerConnection bc = this.mockBrokerTest.new BrokerConnection(this,
+    // target, pipe);
+    // // Set up the pipe for polled access
+    // if (dispatchMode != AbstractTestConnection.BLOCKING) {
+    // pipe.setMode(Pipe.POLLING);
+    // }
+    // // Add subscriptions for the target's destinations:
+    // for (Destination d : target.router.lookupTable.keySet()) {
+    // router.bind(bc, d);
+    // }
+    // brokerConns.add(bc);
+    // }
+
+    public final void stopServices() throws Exception {
+        AbstractTestConnection.setInShutdown(true, dispatcher);
 
-    final void stopServices() throws Exception {
         stopping.set(true);
         transportServer.stop();
-        
-        for (RemoteProducer connection : producers) {
-            connection.stop();
-        }
-        for (RemoteConsumer connection : consumers) {
-            connection.stop();
-        }
-        for (RemoteConnection connection : connections) {
+
+        for (BrokerConnection connection : connections) {
             connection.stop();
         }
         for (BrokerConnection connection : brokerConnections) {
@@ -105,13 +112,10 @@
         for (MockQueue queue : queues.values()) {
             queue.stop();
         }
-        dispatcher.shutdown();
-
     }
 
-    final void startServices() throws Exception {
-
-        dispatcher.start();
+    public final void startServices() throws Exception {
+        AbstractTestConnection.setInShutdown(false, dispatcher);
 
         transportServer = TransportFactory.bind(new URI(uri));
         transportServer.setAcceptListener(this);
@@ -124,25 +128,18 @@
             queue.start();
         }
 
-        for (RemoteConsumer connection : consumers) {
-            connection.start();
-        }
-
-        for (RemoteProducer connection : producers) {
-            connection.start();
-        }
-
         for (BrokerConnection connection : brokerConnections) {
             connection.start();
         }
     }
 
     public void onAccept(final Transport transport) {
-        RemoteConnection connection = new RemoteConnection();
+        BrokerConnection connection = new BrokerConnection();
         connection.setBroker(this);
         connection.setTransport(transport);
         connection.setPriorityLevels(MockBrokerTest.PRIORITY_LEVELS);
         connection.setDispatcher(dispatcher);
+        connection.setUseInputQueue(useInputQueues);
         connections.add(connection);
         try {
             connection.start();
@@ -184,4 +181,46 @@
         return stopping.get();
     }
 
+    protected void createDispatcher() {
+        if (dispatcher == null) {
+            dispatcher = PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+        }
+    }
+
+    /**
+     * Run the broker as a standalone app
+     * 
+     * @param args
+     *            The arguments.
+     */
+    public static void main(String[] args) {
+        if (args.length < 1) {
+            System.err.println("Must supply a bind uri");
+        }
+        String uri = args[0];
+
+        final MockBroker broker = new MockBroker();
+        broker.setUri(uri);
+        broker.setName("Broker");
+        broker.createDispatcher();
+        try {
+            broker.getDispatcher().start();
+            broker.startServices();
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                try {
+                    broker.stopServices();
+                } catch (Exception e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+
 }
\ No newline at end of file



Mime
View raw message