Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 80518 invoked from network); 20 Feb 2009 14:23:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Feb 2009 14:23:50 -0000 Received: (qmail 88505 invoked by uid 500); 20 Feb 2009 14:23:49 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 88477 invoked by uid 500); 20 Feb 2009 14:23:49 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 88467 invoked by uid 99); 20 Feb 2009 14:23:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Feb 2009 06:23:49 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Feb 2009 14:23:47 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5F3E923888A3; Fri, 20 Feb 2009 14:23:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r746251 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/dispatch/ test/java/org/apache/activemq/flow/ Date: Fri, 20 Feb 2009 14:23:26 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090220142327.5F3E923888A3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Feb 20 14:23:26 2009 New Revision: 746251 URL: http://svn.apache.org/viewvc?rev=746251&view=rev Log: Applying Colin's patch https://issues.apache.org/activemq/browse/AMQ-2129 Thanks! Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java?rev=746251&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java Fri Feb 20 14:23:26 2009 @@ -0,0 +1,158 @@ +package org.apache.activemq.dispatch; + +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + + +public abstract class AbstractPooledDispatcher implements IDispatcher, PooledDispatcher { + + private final String name; + + private final ThreadLocal dispatcher = new ThreadLocal(); + private final ThreadLocal> dispatcherContext = new ThreadLocal>(); + private final ArrayList dispatchers = new ArrayList(); + + final AtomicBoolean started = new AtomicBoolean(); + final AtomicBoolean shutdown = new AtomicBoolean(); + + private int roundRobinCounter = 0; + private final int size; + + protected ExecutionLoadBalancer loadBalancer; + + protected AbstractPooledDispatcher(String name, int size) { + this.name = name; + this.size = size; + loadBalancer = new SimpleLoadBalancer(); + } + + /** + * Subclasses should implement this to return a new dispatcher. + * + * @param name + * The name to assign the dispatcher. + * @param pool + * The pool. + * @return The new dispathcer. + */ + protected abstract D createDispatcher(String name, AbstractPooledDispatcher pool) throws Exception; + + /** + * @see org.apache.activemq.dispatch.IDispatcher#start() + */ + public synchronized final void start() throws Exception { + loadBalancer.start(); + if (started.compareAndSet(false, true)) { + // Create all the workers. + try { + for (int i = 0; i < size; i++) { + D dispatacher = createDispatcher(name + "-" + (i + 1), this); + + dispatchers.add(dispatacher); + dispatacher.start(); + } + } catch (Exception e) { + shutdown(); + } + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.dispatch.IDispatcher#shutdown() + */ + public synchronized final void shutdown() throws InterruptedException { + shutdown.set(true); + boolean interrupted = false; + while (!dispatchers.isEmpty()) { + try { + dispatchers.get(dispatchers.size() - 1).shutdown(); + } catch (InterruptedException ie) { + interrupted = true; + continue; + } + dispatchers.remove(dispatchers.size() - 1); + + } + // Re-interrupt: + if (interrupted) { + Thread.currentThread().interrupt(); + } + + loadBalancer.stop(); + } + + public void setCurrentDispatchContext(PooledDispatchContext context) { + dispatcherContext.set(context); + } + + public PooledDispatchContext getCurrentDispatchContext() { + return dispatcherContext.get(); + } + + /** + * Returns the currently executing dispatcher, or null if the current thread + * is not a dispatcher: + * + * @return The currently executing dispatcher + */ + public D getCurrentDispatcher() { + return dispatcher.get(); + } + + /** + * A Dispatcher must call this to indicate that is has started it's dispatch + * loop. + */ + public void onDispatcherStarted(D d) { + dispatcher.set(d); + loadBalancer.addDispatcher(d); + } + + public ExecutionLoadBalancer getLoadBalancer() { + return loadBalancer; + } + + /** + * A Dispatcher must call this when exiting it's dispatch loop + */ + public void onDispatcherStopped(D d) { + loadBalancer.removeDispatcher(d); + } + + protected D chooseDispatcher() { + D d = dispatcher.get(); + if (d == null) { + synchronized (dispatchers) { + if (++roundRobinCounter >= size) { + roundRobinCounter = 0; + } + return dispatchers.get(roundRobinCounter); + } + } else { + return d; + } + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable, + * long, java.util.concurrent.TimeUnit) + */ + public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) { + chooseDispatcher().schedule(runnable, delay, timeUnit); + } + + public DispatchContext register(Dispatchable dispatchable, String name) { + return chooseDispatcher().register(dispatchable, name); + } + + public String toString() { + return name; + } + +} Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java?rev=746251&r1=746250&r2=746251&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java Fri Feb 20 14:23:26 2009 @@ -16,24 +16,23 @@ */ package org.apache.activemq.dispatch; -import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher; import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext; -public interface ExecutionLoadBalancer { +public interface ExecutionLoadBalancer { - public interface ExecutionTracker { - public void onDispatchRequest(PoolableDispatcher caller, PooledDispatchContext context); + public interface ExecutionTracker { + public void onDispatchRequest(D caller, PooledDispatchContext context); - public void close(); - } + public void close(); + } - public void addDispatcher(PoolableDispatcher dispatcher); + public void addDispatcher(D dispatcher); - public void removeDispatcher(PoolableDispatcher dispatcher); + public void removeDispatcher(D dispatcher); - public ExecutionTracker createExecutionTracker(PooledDispatchContext context); + public ExecutionTracker createExecutionTracker(PooledDispatchContext context); - public void start(); + public void start(); - public void stop(); + public void stop(); } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java Fri Feb 20 14:23:26 2009 @@ -79,10 +79,10 @@ public void close(); } - class RunnableAdapter implements Dispatchable { + public class RunnableAdapter implements Dispatchable { final Runnable runnable; - RunnableAdapter(Runnable runnable) { + public RunnableAdapter(Runnable runnable) { this.runnable = runnable; } @@ -117,7 +117,7 @@ /** * Starts the dispatcher. */ - public void start(); + public void start() throws Exception; /** * Shuts down the dispatcher, this may result in previous dispatch requests Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java Fri Feb 20 14:23:26 2009 @@ -16,86 +16,66 @@ */ package org.apache.activemq.dispatch; +import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker; import org.apache.activemq.dispatch.IDispatcher.DispatchContext; -public interface PooledDispatcher { +public interface PooledDispatcher { - /** - * A {@link PooledDispatchContext}s can be moved between different - * dispatchers. - */ - public interface PooledDispatchContext extends DispatchContext { - /** - * Called to transfer a {@link PooledDispatchContext} to a new - * Dispatcher. - */ - public void assignToNewDispatcher(PoolableDispatcher newDispatcher); - - /** - * A dispatcher must call this when it starts dispatch for this context - */ - public void startingDispatch(); - - /** - * A dispatcher must call this when it has finished dispatching a - * context - */ - public void finishedDispatch(); - - /** - * Called by the dispatch thread to let the pooled context set any info - * set by other threads. - */ - public void processForeignUpdates(); - } - - public interface PoolableDispatchContext extends DispatchContext { - - public void setPooledDispatchContext(PooledDispatchContext context); - - /** - * Indicates that another thread has made an update to the dispatch - * context. - * - */ - public void onForeignThreadUpdate(); - - public PoolableDispatcher getDispatcher(); - } - - /** - * A PoolableDispatcher is one that can be owned by an - * {@link PooledDispatcher}. - */ - public interface PoolableDispatcher extends IDispatcher { - - /** - * Indicates that another thread has made an update to the dispatch - * context. - * - */ - public PoolableDispatchContext createPoolableDispatchContext(Dispatchable dispatchable, String name); - } - - /** - * This wraps the dispatch context into one that is load balanced by the - * LoadBalancer - * - * @param context - * The context to wrap. - * @return - */ - public PooledDispatchContext createPooledDispatchContext(PoolableDispatchContext context); - - /** - * A Dispatcher must call this from it's dispatcher thread to indicate that - * is has started it's dispatch has started. - */ - public void onDispatcherStarted(PoolableDispatcher dispatcher); - - /** - * A Dispatcher must call this from it's dispatcher thread when exiting it's - * dispatch loop - */ - public void onDispatcherStopped(PoolableDispatcher dispatcher); + /** + * A {@link PooledDispatchContext}s can be moved between different + * dispatchers. + */ + public interface PooledDispatchContext extends DispatchContext { + /** + * Called to transfer a {@link PooledDispatchContext} to a new + * Dispatcher. + */ + public void assignToNewDispatcher(D newDispatcher); + + /** + * Gets the dispatcher to which this PooledDispatchContext currently + * belongs + * + * @return + */ + public D getDispatcher(); + + /** + * Gets the execution tracker for the context. + * + * @return the execution tracker for the context: + */ + public ExecutionTracker getExecutionTracker(); + } + + /** + * A Dispatcher must call this from it's dispatcher thread to indicate that + * is has started it's dispatch has started. + */ + public void onDispatcherStarted(D dispatcher); + + /** + * A Dispatcher must call this from it's dispatcher thread when exiting it's + * dispatch loop + */ + public void onDispatcherStopped(D dispatcher); + + /** + * Returns the currently executing dispatcher, or null if the current thread + * is not a dispatcher: + * + * @return The currently executing dispatcher + */ + public D getCurrentDispatcher(); + + public void setCurrentDispatchContext(PooledDispatchContext context); + + public PooledDispatchContext getCurrentDispatchContext(); + + /** + * Returns the load balancer for this dispatch pool. + * + * @return + */ + public ExecutionLoadBalancer getLoadBalancer(); } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Fri Feb 20 14:23:26 2009 @@ -16,449 +16,564 @@ */ package org.apache.activemq.dispatch; -import java.util.LinkedList; -import java.util.TreeMap; import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatchContext; -import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher; +import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker; import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext; import org.apache.activemq.queue.Mapper; import org.apache.kahadb.util.LinkedNode; import org.apache.kahadb.util.LinkedNodeList; -public class PriorityDispatcher implements Runnable, PoolableDispatcher { +public class PriorityDispatcher> implements Runnable, IDispatcher { - private Thread thread; - private boolean running = false; - private boolean threaded = false; - private final int MAX_USER_PRIORITY; - - static final ThreadLocal dispatcher = new ThreadLocal(); - - private final PooledDispatcher pooledDispatcher; - - // The local dispatch queue: - private final PriorityLinkedList priorityQueue; - - // Dispatch queue for requests from other threads: - private final LinkedNodeList[] foreignQueue; - private static final int[] TOGGLE = new int[] { 1, 0 }; - private int foreignToggle = 0; - - // Timed Execution List - private final TimerHeap timerHeap = new TimerHeap(); - - private final String name; - private final AtomicBoolean foreignAvailable = new AtomicBoolean(false); - private final Semaphore foreignPermits = new Semaphore(0); - - private final Mapper PRIORITY_MAPPER = new Mapper() { - public Integer map(PriorityDispatchContext element) { - return element.listPrio; - } - }; - - public PriorityDispatcher(String name, int priorities, PooledDispatcher pooledDispactcher) { - this.name = name; - MAX_USER_PRIORITY = priorities; - priorityQueue = new PriorityLinkedList(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER); - foreignQueue = createForeignEventQueue(); - for (int i = 0; i < 2; i++) { - foreignQueue[i] = new LinkedNodeList(); - } - this.pooledDispatcher = pooledDispactcher; - } - - @SuppressWarnings("unchecked") - private LinkedNodeList[] createForeignEventQueue() { - return new LinkedNodeList[2]; - } - - private abstract class ForeignEvent extends LinkedNode { - public abstract void execute(); - - final void addToList() { - synchronized (foreignQueue) { - if (!this.isLinked()) { - foreignQueue[foreignToggle].addLast(this); - if (!foreignAvailable.getAndSet(true)) { - foreignPermits.release(); - } - } - } - } - } - - public boolean isThreaded() { - return threaded; - } - - public void setThreaded(boolean threaded) { - this.threaded = threaded; - } - - private class UpdateEvent extends ForeignEvent { - private final PriorityDispatchContext pdc; - - UpdateEvent(PriorityDispatchContext pdc) { - this.pdc = pdc; - } - - // Can only be called by the owner of this dispatch context: - public void execute() { - pdc.poolContext.processForeignUpdates(); - } - } - - class PriorityDispatchContext extends LinkedNode implements PoolableDispatchContext { - // The dispatchable target: - final Dispatchable dispatchable; - PooledDispatchContext poolContext; - // The name of this context: - final String name; - // list prio can only be updated in the thread of of this dispatcher: - int listPrio; - // The update events are used to update fields in the dispatch context - // from foreign threads: - final UpdateEvent updateEvent[] = new UpdateEvent[] { new UpdateEvent(this), new UpdateEvent(this) }; - - private PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) { - super(); - this.dispatchable = dispatchable; - this.name = name; - } - - // This can only be called on this thread - public final void requestDispatch() { - if (!isLinked()) { - priorityQueue.add(this, listPrio); - } - return; - } - - // This can only be called on this thread - public final void updatePriority(int priority) { - if (priority != listPrio) { - - listPrio = priority; - // If there is a priority change relink the context - // at the new priority: - if (isLinked()) { - unlink(); - priorityQueue.add(this, listPrio); - } - } - return; - - } - - public void onForeignThreadUpdate() { - synchronized (foreignQueue) { - updateEvent[foreignToggle].addToList(); - } - } - - // This can only be called on this thread - public void close() { - if (isLinked()) { - unlink(); - } - synchronized (foreignQueue) { - if (updateEvent[foreignToggle].isLinked()) { - updateEvent[foreignToggle].unlink(); - } - } - } - - /** - * This can only be called by the owning dispatch thread: - * - * @return False if the dispatchable has more work to do. - */ - public final boolean dispatch() { - return dispatchable.dispatch(); - } - - public String toString() { - return name; - } - - public Dispatchable getDispatchable() { - return dispatchable; - } - - public void setPooledDispatchContext(PooledDispatchContext context) { - this.poolContext = context; - } - - public String getName() { - return name; - } - - public PoolableDispatcher getDispatcher() { - return PriorityDispatcher.this; - } - } - - public DispatchContext register(Dispatchable dispatchable, String name) { - return createPoolableDispatchContext(dispatchable, name); - } - - public PoolableDispatchContext createPoolableDispatchContext(Dispatchable dispatchable, String name) { - return new PriorityDispatchContext(dispatchable, true, name); - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.dispatch.IDispatcher#start() - */ - public synchronized final void start() { - if (thread == null) { - running = true; - thread = new Thread(this, name); - thread.start(); - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.dispatch.IDispatcher#shutdown() - */ - public synchronized final void shutdown() throws InterruptedException { - if (thread != null) { - dispatch(new RunnableAdapter(new Runnable() { - - public void run() { - running = false; - } - - }), MAX_USER_PRIORITY + 1); - thread.interrupt(); - thread.join(); - thread = null; - } - } - - public void run() { - - // Inform the dispatcher that we have started: - pooledDispatcher.onDispatcherStarted(this); - dispatcher.set(this); - PriorityDispatchContext pdc; - try { - while (running) { - pdc = priorityQueue.poll(); - // If no local work available wait for foreign work: - if (pdc == null) { - foreignPermits.acquire(); - } else { - pdc.poolContext.startingDispatch(); - - while (!pdc.dispatch()) { - // If there is a higher priority dispatchable stop - // processing this one: - if (pdc.listPrio < priorityQueue.getHighestPriority()) { - // May have gotten relinked by the caller: - if (!pdc.isLinked()) { - priorityQueue.add(pdc, pdc.listPrio); - } - break; - } - } - - pdc.poolContext.finishedDispatch(); - - } - - // Execute delayed events: - timerHeap.executeReadyEvents(); - - // Check for foreign dispatch requests: - if (foreignAvailable.get()) { - LinkedNodeList foreign; - synchronized (foreignQueue) { - // Swap foreign queues and drain permits; - foreign = foreignQueue[foreignToggle]; - foreignToggle = TOGGLE[foreignToggle]; - foreignAvailable.set(false); - foreignPermits.drainPermits(); - } - while (true) { - ForeignEvent fe = foreign.getHead(); - if (fe == null) { - break; - } - - fe.unlink(); - fe.execute(); - } - - } - } - } catch (InterruptedException e) { - return; - } catch (Throwable thrown) { - thrown.printStackTrace(); - } finally { - pooledDispatcher.onDispatcherStopped(this); - } - } - - class ThreadSafeDispatchContext implements PooledDispatchContext { - final PriorityDispatchContext delegate; - - ThreadSafeDispatchContext(PriorityDispatchContext context) { - this.delegate = context; - delegate.setPooledDispatchContext(this); - } - - public void finishedDispatch() { - // NOOP - - } - - public void startingDispatch() { - // Noop - - } - - public void close() { - // Noop this is always transient: - } - - public void processForeignUpdates() { - requestDispatch(); - } - - public Dispatchable getDispatchable() { - return delegate.getDispatchable(); - } - - public void requestDispatch() { - if (dispatcher.get() == PriorityDispatcher.this) { - delegate.requestDispatch(); - } else { - delegate.onForeignThreadUpdate(); - } - } - - public void updatePriority(int priority) { - throw new UnsupportedOperationException("Not implemented"); - } - - public String getName() { - return delegate.name; - } - - public void assignToNewDispatcher(PoolableDispatcher newDispatcher) { - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq - * .dispatch.Dispatcher.Dispatchable) - */ - final void dispatch(Dispatchable dispatchable, int priority) { - ThreadSafeDispatchContext context = new ThreadSafeDispatchContext(new PriorityDispatchContext(dispatchable, false, name)); - context.delegate.updatePriority(priority); - context.requestDispatch(); - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int) - */ - public Executor createPriorityExecutor(final int priority) { - - return new Executor() { - - public void execute(final Runnable runnable) { - dispatch(new RunnableAdapter(runnable), priority); - } - }; - } - - public void execute(final Runnable runnable) { - dispatch(new RunnableAdapter(runnable), 0); - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable, - * long, java.util.concurrent.TimeUnit) - */ - public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) { - if (dispatcher.get() == this) { - timerHeap.add(runnable, delay, timeUnit); - } else { - new ForeignEvent() { - public void execute() { - timerHeap.add(runnable, delay, timeUnit); - } - }.addToList(); - } - } - - public String toString() { - return name; - } - - private class TimerHeap { - - final TreeMap> timers = new TreeMap>(); - - private void add(Runnable runnable, long delay, TimeUnit timeUnit) { - - long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS); - long eTime = System.nanoTime() + nanoDelay; - LinkedList list = new LinkedList(); - list.add(runnable); - - LinkedList old = timers.put(eTime, list); - if (old != null) { - list.addAll(old); - } - } - - private void executeReadyEvents() { - LinkedList ready = null; - if (timers.isEmpty()) { - return; - } else { - long now = System.nanoTime(); - long first = timers.firstKey(); - if (first > now) { - return; - } - ready = new LinkedList(); - - while (first < now) { - ready.addAll(timers.remove(first)); - if (timers.isEmpty()) { - break; - } - first = timers.firstKey(); - - } - } - - for (Runnable runnable : ready) { - try { - runnable.run(); - } catch (Throwable thrown) { - thrown.printStackTrace(); - } - } - } - } + private static final boolean DEBUG = false; + private Thread thread; + protected boolean running = false; + private boolean threaded = false; + private final int MAX_USER_PRIORITY; + + // Set if this dispatcher is part of a dispatch pool: + protected final PooledDispatcher pooledDispatcher; + + // The local dispatch queue: + private final PriorityLinkedList priorityQueue; + + // Dispatch queue for requests from other threads: + private final LinkedNodeList[] foreignQueue; + private static final int[] TOGGLE = new int[] { 1, 0 }; + private int foreignToggle = 0; + + // Timed Execution List + protected final TimerHeap timerHeap = new TimerHeap(); + + private final String name; + private final AtomicBoolean foreignAvailable = new AtomicBoolean(false); + private final Semaphore foreignPermits = new Semaphore(0); + + private final Mapper PRIORITY_MAPPER = new Mapper() { + public Integer map(PriorityDispatchContext element) { + return element.listPrio; + } + }; + + protected PriorityDispatcher(String name, int priorities, PooledDispatcher pooledDispactcher) { + this.name = name; + MAX_USER_PRIORITY = priorities; + priorityQueue = new PriorityLinkedList(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER); + foreignQueue = createForeignEventQueue(); + for (int i = 0; i < 2; i++) { + foreignQueue[i] = new LinkedNodeList(); + } + this.pooledDispatcher = pooledDispactcher; + } + + public static final IDispatcher createPriorityDispatcher(String name, int numPriorities) { + return new PriorityDispatcher(name, numPriorities, null); + } + + public static final IDispatcher createPriorityDispatchPool(String name, final int numPriorities, int size) { + return new AbstractPooledDispatcher(name, size) { + + @Override + protected final PriorityDispatcher createDispatcher(String name, AbstractPooledDispatcher pool) throws Exception { + // TODO Auto-generated method stub + return new PriorityDispatcher(name, numPriorities, this); + } + + public final Executor createPriorityExecutor(final int priority) { + return new Executor() { + public void execute(final Runnable runnable) { + chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority); + } + }; + } + }; + } + + @SuppressWarnings("unchecked") + private LinkedNodeList[] createForeignEventQueue() { + return new LinkedNodeList[2]; + } + + protected abstract class ForeignEvent extends LinkedNode { + public abstract void execute(); + + final void addToList() { + synchronized (foreignQueue) { + if (!this.isLinked()) { + foreignQueue[foreignToggle].addLast(this); + if (!foreignAvailable.getAndSet(true)) { + wakeup(); + } + } + } + } + } + + public boolean isThreaded() { + return threaded; + } + + public void setThreaded(boolean threaded) { + this.threaded = threaded; + } + + private class UpdateEvent extends ForeignEvent { + private final PriorityDispatchContext pdc; + + UpdateEvent(PriorityDispatchContext pdc) { + this.pdc = pdc; + } + + // Can only be called by the owner of this dispatch context: + public void execute() { + pdc.processForeignUpdates(); + } + } + + public DispatchContext register(Dispatchable dispatchable, String name) { + return new PriorityDispatchContext(dispatchable, true, name); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.dispatch.IDispatcher#start() + */ + public synchronized final void start() { + if (thread == null) { + running = true; + thread = new Thread(this, name); + thread.start(); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.dispatch.IDispatcher#shutdown() + */ + public synchronized final void shutdown() throws InterruptedException { + if (thread != null) { + dispatch(new RunnableAdapter(new Runnable() { + + public void run() { + running = false; + } + + }), MAX_USER_PRIORITY + 1); + // thread.interrupt(); + thread.join(); + thread = null; + } + } + + public void run() { + + // Inform the dispatcher that we have started: + pooledDispatcher.onDispatcherStarted((D) this); + PriorityDispatchContext pdc; + try { + while (running) { + pdc = priorityQueue.poll(); + // If no local work available wait for foreign work: + if (pdc == null) { + waitForEvents(); + } else { + if (pdc.tracker != null) { + pooledDispatcher.setCurrentDispatchContext(pdc); + } + + while (!pdc.dispatch()) { + // If there is a higher priority dispatchable stop + // processing this one: + if (pdc.listPrio < priorityQueue.getHighestPriority()) { + // May have gotten relinked by the caller: + if (!pdc.isLinked()) { + priorityQueue.add(pdc, pdc.listPrio); + } + break; + } + } + + pooledDispatcher.setCurrentDispatchContext(null); + } + + // Execute delayed events: + timerHeap.executeReadyTimers(); + + // Allow subclasses to do additional work: + dispatchHook(); + + // Check for foreign dispatch requests: + if (foreignAvailable.get()) { + LinkedNodeList foreign; + synchronized (foreignQueue) { + // Swap foreign queues and drain permits; + foreign = foreignQueue[foreignToggle]; + foreignToggle = TOGGLE[foreignToggle]; + foreignAvailable.set(false); + foreignPermits.drainPermits(); + } + while (true) { + ForeignEvent fe = foreign.getHead(); + if (fe == null) { + break; + } + + fe.unlink(); + fe.execute(); + } + + } + } + } catch (InterruptedException e) { + return; + } catch (Throwable thrown) { + thrown.printStackTrace(); + } finally { + pooledDispatcher.onDispatcherStopped((D) this); + } + } + + /** + * Subclasses may override this to do do additional dispatch work: + */ + protected void dispatchHook() throws Exception { + + } + + /** + * Subclasses may override this to implement another mechanism for wakeup. + * + * @throws Exception + */ + protected void waitForEvents() throws Exception { + foreignPermits.acquire(); + } + + /** + * Subclasses may override this to provide an alternative wakeup mechanism. + */ + protected void wakeup() { + foreignPermits.release(); + } + + protected final void onForeignUdate(PriorityDispatchContext context) { + synchronized (foreignQueue) { + + ForeignEvent fe = context.updateEvent[foreignToggle]; + if (!fe.isLinked()) { + foreignQueue[foreignToggle].addLast(fe); + if (!foreignAvailable.getAndSet(true)) { + wakeup(); + } + } + } + } + + protected final boolean removeDispatchContext(PriorityDispatchContext context) { + synchronized (foreignQueue) { + + if (context.updateEvent[0].isLinked()) { + context.updateEvent[0].unlink(); + } + if (context.updateEvent[1].isLinked()) { + context.updateEvent[1].unlink(); + } + if (context.isLinked()) { + context.unlink(); + return true; + } + } + return false; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq + * .dispatch.Dispatcher.Dispatchable) + */ + public final void dispatch(Dispatchable dispatchable, int priority) { + PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name); + context.updatePriority(priority); + context.requestDispatch(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int) + */ + public Executor createPriorityExecutor(final int priority) { + + return new Executor() { + + public void execute(final Runnable runnable) { + dispatch(new RunnableAdapter(runnable), priority); + } + }; + } + + public void execute(final Runnable runnable) { + dispatch(new RunnableAdapter(runnable), 0); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable, + * long, java.util.concurrent.TimeUnit) + */ + public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) { + if (getCurrentDispatcher() == this) { + timerHeap.add(runnable, delay, timeUnit); + } else { + new ForeignEvent() { + public void execute() { + timerHeap.add(runnable, delay, timeUnit); + } + }.addToList(); + } + } + + public String toString() { + return name; + } + + private final D getCurrentDispatcher() { + return pooledDispatcher.getCurrentDispatcher(); + } + + private final PooledDispatchContext getCurrentDispatchContext() { + return pooledDispatcher.getCurrentDispatchContext(); + } + + /** + * + */ + protected class PriorityDispatchContext extends LinkedNode implements PooledDispatchContext { + // The dispatchable target: + final Dispatchable dispatchable; + // The name of this context: + final String name; + // list prio can only be updated in the thread of of the owning + // dispatcher + protected int listPrio; + + // The update events are used to update fields in the dispatch context + // from foreign threads: + final UpdateEvent updateEvent[]; + + private final ExecutionTracker tracker; + private D currentOwner; + private D updateDispatcher = null; + + private int priority; + private boolean dispatchRequested = false; + private boolean closed = false; + + protected PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) { + this.dispatchable = dispatchable; + this.name = name; + this.currentOwner = (D) PriorityDispatcher.this; + if (persistent) { + this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this); + } else { + this.tracker = null; + } + updateEvent = createUpdateEvent(); + updateEvent[0] = new UpdateEvent(this); + updateEvent[1] = new UpdateEvent(this); + } + + @SuppressWarnings("unchecked") + private final PriorityDispatcher.UpdateEvent[] createUpdateEvent() { + return new PriorityDispatcher.UpdateEvent[2]; + } + + /** + * Gets the execution tracker for the context. + * + * @return the execution tracker for the context: + */ + public ExecutionTracker getExecutionTracker() { + return tracker; + } + + /** + * This can only be called by the owning dispatch thread: + * + * @return False if the dispatchable has more work to do. + */ + public final boolean dispatch() { + return dispatchable.dispatch(); + } + + public final void assignToNewDispatcher(D newDispatcher) { + synchronized (this) { + + // If we're already set to this dispatcher + if (newDispatcher == currentOwner) { + if (updateDispatcher == null || updateDispatcher == newDispatcher) { + return; + } + } + + updateDispatcher = newDispatcher; + if (DEBUG) + System.out.println(getName() + " updating to " + updateDispatcher); + } + currentOwner.onForeignUdate(this); + } + + public void requestDispatch() { + + D callingDispatcher = getCurrentDispatcher(); + if (tracker != null) + tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext()); + + // Otherwise this is coming off another thread, so we need to + // synchronize + // to protect against ownership changes: + synchronized (this) { + // If the owner of this context is the calling thread, then + // delegate to the dispatcher. + if (currentOwner == callingDispatcher) { + + if (!isLinked()) { + currentOwner.priorityQueue.add(this, listPrio); + } + return; + } + + dispatchRequested = true; + } + // FIXME Thread safety! + currentOwner.onForeignUdate(this); + } + + public void updatePriority(int priority) { + if (this.priority == priority) { + return; + } + D callingDispatcher = getCurrentDispatcher(); + + // Otherwise this is coming off another thread, so we need to + // synchronize to protect against ownership changes: + synchronized (this) { + this.priority = priority; + + // If this is called by the owning dispatcher, then we go ahead + // and update: + if (currentOwner == callingDispatcher) { + + if (priority != listPrio) { + + listPrio = priority; + // If there is a priority change relink the context + // at the new priority: + if (isLinked()) { + unlink(); + currentOwner.priorityQueue.add(this, listPrio); + } + } + return; + } + } + // FIXME Thread safety! + currentOwner.onForeignUdate(this); + } + + public void processForeignUpdates() { + boolean ownerChange = false; + synchronized (this) { + + if (closed) { + close(); + return; + } + + if (updateDispatcher != null) { + if (DEBUG) { + System.out.println("Assigning " + getName() + " to " + updateDispatcher); + } + if (currentOwner.removeDispatchContext(this)) { + dispatchRequested = true; + } + currentOwner = updateDispatcher; + updateDispatcher = null; + ownerChange = true; + } else { + updatePriority(priority); + + if (dispatchRequested) { + dispatchRequested = false; + requestDispatch(); + } + } + } + + if (ownerChange) { + currentOwner.onForeignUdate(this); + } + } + + /** + * May be overriden by subclass to additional work on dispatcher switch + * + * @param oldDispatcher + * The old dispatcher + * @param newDispatcher + * The new Dispatcher + */ + protected void switchedDispatcher(D oldDispatcher, D newDispatcher) { + + } + + public void close() { + tracker.close(); + D callingDispatcher = getCurrentDispatcher(); + synchronized (this) { + closed = true; + + // If the owner of this context is the calling thread, then + // delegate to the dispatcher. + if (currentOwner == callingDispatcher) { + if (isLinked()) { + unlink(); + } + // FIXME Deadlock potential! + synchronized (foreignQueue) { + if (updateEvent[foreignToggle].isLinked()) { + updateEvent[foreignToggle].unlink(); + } + } + } + } + currentOwner.onForeignUdate(this); + } + + public final String toString() { + return getName(); + } + + public Dispatchable getDispatchable() { + return dispatchable; + } + + public D getDispatcher() { + return currentOwner; + } + + public String getName() { + return name; + } + } } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java Fri Feb 20 14:23:26 2009 @@ -1,307 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.dispatch; - -import java.util.ArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker; - -public class PriorityPooledDispatcher implements IDispatcher, PooledDispatcher { - private final String name; - - private static final ThreadLocal dispatchContext = new ThreadLocal(); - private static final ThreadLocal dispatcher = new ThreadLocal(); - - private final ArrayList dispatchers = new ArrayList(); - - final AtomicBoolean started = new AtomicBoolean(); - final AtomicBoolean shutdown = new AtomicBoolean(); - - private int roundRobinCounter = 0; - private final int size; - private final boolean DEBUG = false; - - private final ExecutionLoadBalancer loadBalancer; - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int) - */ - public Executor createPriorityExecutor(final int priority) { - return new Executor() { - public void execute(final Runnable runnable) { - chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority); - } - }; - } - - public PriorityPooledDispatcher(String name, int size, int priorities) { - this.name = name; - this.size = size; - loadBalancer = new SimpleLoadBalancer(); - // Create all the workers. - for (int i = 0; i < size; i++) { - PriorityDispatcher dispatcher = new PriorityDispatcher(name + "-" + (i + 1), priorities, this); - dispatchers.add(dispatcher); - } - } - - public DispatchContext register(Dispatchable dispatchable, String name) { - return createPooledDispatchContext(chooseDispatcher().createPoolableDispatchContext(dispatchable, name)); - } - - /** - * @see org.apache.activemq.dispatch.IDispatcher#start() - */ - public synchronized final void start() { - loadBalancer.start(); - if (started.compareAndSet(false, true)) { - // Create all the workers. - for (int i = 0; i < size; i++) { - dispatchers.get(i).start(); - } - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.dispatch.IDispatcher#shutdown() - */ - public synchronized final void shutdown() throws InterruptedException { - shutdown.set(true); - for (PriorityDispatcher dispatcher : dispatchers) { - dispatcher.shutdown(); - } - loadBalancer.stop(); - } - - private PriorityDispatcher chooseDispatcher() { - PriorityDispatcher d = PriorityDispatcher.dispatcher.get(); - if (d == null) { - synchronized (dispatchers) { - if (++roundRobinCounter >= size) { - roundRobinCounter = 0; - } - return dispatchers.get(roundRobinCounter); - } - } else { - return d; - } - } - - public void execute(final Runnable runnable) { - chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0); - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable, - * long, java.util.concurrent.TimeUnit) - */ - public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) { - chooseDispatcher().schedule(runnable, delay, timeUnit); - } - - public PooledDispatchContext createPooledDispatchContext(PoolableDispatchContext context) { - return new PriorityPooledDispatchContext(context); - } - - /** - * A Dispatcher must call this to indicate that is has started it's dispatch - * loop. - */ - public void onDispatcherStarted(PoolableDispatcher d) { - dispatcher.set(d); - loadBalancer.addDispatcher(d); - } - - /** - * A Dispatcher must call this when exiting it's dispatch loop - */ - public void onDispatcherStopped(PoolableDispatcher d) { - loadBalancer.removeDispatcher(d); - } - - /** - * ExecutionGraphNode tracks dispatch information for a - * MappableDispatchContext. - * - */ - public class PriorityPooledDispatchContext implements PooledDispatchContext { - private final ExecutionTracker tracker; - - private PoolableDispatchContext context; - private PoolableDispatcher currentOwner; - private int priority; - private boolean dispatchRequested = false; - private PoolableDispatcher updateDispatcher = null; - private boolean closed = false; - - PriorityPooledDispatchContext(PoolableDispatchContext context) { - this.context = context; - this.context.setPooledDispatchContext(this); - this.currentOwner = context.getDispatcher(); - this.tracker = loadBalancer.createExecutionTracker(this); - - } - - public final void startingDispatch() { - dispatchContext.set(this); - } - - public final void finishedDispatch() { - dispatchContext.set(null); - } - - public final void assignToNewDispatcher(PoolableDispatcher newDispatcher) { - synchronized (this) { - - // If we're already set to this dispatcher - if (newDispatcher == currentOwner) { - if (updateDispatcher == null || updateDispatcher == newDispatcher) { - return; - } - } - - updateDispatcher = newDispatcher; - if (DEBUG) - System.out.println(getName() + " updating to " + context.getDispatcher()); - } - context.onForeignThreadUpdate(); - } - - public void requestDispatch() { - - PoolableDispatcher callingDispatcher = dispatcher.get(); - - tracker.onDispatchRequest(callingDispatcher, dispatchContext.get()); - - // Otherwise this is coming off another thread, so we need to - // synchronize - // to protect against ownership changes: - synchronized (this) { - // If the owner of this context is the calling thread, then - // delegate to the dispatcher. - if (currentOwner == callingDispatcher) { - - context.requestDispatch(); - return; - } - - dispatchRequested = true; - } - context.onForeignThreadUpdate(); - } - - public void updatePriority(int priority) { - if (this.priority == priority) { - return; - } - // Otherwise this is coming off another thread, so we need to - // synchronize to protect against ownership changes: - synchronized (this) { - this.priority = priority; - - IDispatcher callingDispatcher = dispatcher.get(); - - // If the owner of this context is the calling thread, then - // delegate to the dispatcher. - if (currentOwner == callingDispatcher) { - - context.updatePriority(priority); - return; - } - } - context.onForeignThreadUpdate(); - } - - public void processForeignUpdates() { - boolean ownerChange = false; - synchronized (this) { - - if (closed) { - context.close(); - return; - } - - if (updateDispatcher != null) { - // Close the old context: - if (DEBUG) { - System.out.println("Assigning " + getName() + " to " + updateDispatcher); - } - context.close(); - - currentOwner = updateDispatcher; - updateDispatcher = null; - context = currentOwner.createPoolableDispatchContext(context.getDispatchable(), context.getName()); - dispatchRequested = true; - context.updatePriority(priority); - context.setPooledDispatchContext(this); - ownerChange = true; - } else { - context.updatePriority(priority); - - if (dispatchRequested) { - context.requestDispatch(); - dispatchRequested = false; - } - } - } - - if (ownerChange) { - context.onForeignThreadUpdate(); - } - } - - public void close() { - tracker.close(); - synchronized (this) { - IDispatcher callingDispatcher = dispatcher.get(); - - // If the owner of this context is the calling thread, then - // delegate to the dispatcher. - if (currentOwner == callingDispatcher) { - context.close(); - return; - } - } - context.onForeignThreadUpdate(); - } - - public final String toString() { - return context.toString(); - } - - public Dispatchable getDispatchable() { - return context.getDispatchable(); - } - - public String getName() { - return context.getName(); - } - } - - public String toString() { - return name; - } -} Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=746251&r1=746250&r2=746251&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Fri Feb 20 14:23:26 2009 @@ -19,117 +19,119 @@ import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher; import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext; -public class SimpleLoadBalancer implements ExecutionLoadBalancer { +public class SimpleLoadBalancer implements ExecutionLoadBalancer { - private final boolean DEBUG = false; + private final boolean DEBUG = false; - SimpleLoadBalancer() { - } + public SimpleLoadBalancer() { + } - private class ExecutionStats { - final PooledDispatchContext target; - final PooledDispatchContext source; - int count; - - ExecutionStats(PooledDispatchContext source, PooledDispatchContext target) { - this.target = target; - this.source = source; - } - - public String toString() { - return "Connection from: " + source + " to " + target; - } - } - - public void addDispatcher(PoolableDispatcher dispatcher) { - - } - - public void removeDispatcher(PoolableDispatcher dispatcher) { - - } - - public void start() { - } - - public void stop() { - } - - public ExecutionTracker createExecutionTracker(PooledDispatchContext context) { - return new SimpleExecutionTracker(context); - } - - private class SimpleExecutionTracker implements ExecutionTracker { - private final HashMap sources = new HashMap(); - private final PooledDispatchContext context; - private final AtomicInteger work = new AtomicInteger(0); - - private PooledDispatchContext singleSource; - private PoolableDispatcher currentOwner; - - SimpleExecutionTracker(PooledDispatchContext context) { - this.context = context; - } - - /** - * This method is called to track which dispatch contexts are requesting - * dispatch for the target context represented by this node. - * - * This method is not threadsafe, the caller must ensure serialized - * access to this method. - * - * @param callngDispatcher - * The calling dispatcher. - * @param context - * the originating dispatch context - * @return True if this method resulted in the dispatch request being - * assigned to another dispatcher. - */ - public void onDispatchRequest(PoolableDispatcher callingDispatcher, PooledDispatchContext callingContext) { - - if (callingContext != null) { - // Make sure we are being called by another node: - if (callingContext == null || callingContext == context) { - return; - } - - // Optimize for single source case: - if (singleSource != callingContext) { - if (singleSource == null && sources.isEmpty()) { - singleSource = callingContext; - ExecutionStats stats = new ExecutionStats(callingContext, context); - sources.put(callingContext, stats); - - // If this context only has a single source - // assign it to that source to minimize contention: - if (callingDispatcher != currentOwner) { - currentOwner = callingDispatcher; - if (DEBUG) - System.out.println("Assigning: " + context + " to " + callingContext + "'s dispatcher: " + callingDispatcher); - context.assignToNewDispatcher(callingDispatcher); - } - - } else { - - ExecutionStats stats = sources.get(callingContext); - if (stats == null) { - stats = new ExecutionStats(callingContext, context); - sources.put(callingContext, stats); - } - - if (singleSource != null) { - singleSource = null; - } - } - } - work.incrementAndGet(); - } - } - - public void close() { - } - } + @SuppressWarnings("hiding") + private class ExecutionStats { + final PooledDispatchContext target; + final PooledDispatchContext source; + int count; + + ExecutionStats(PooledDispatchContext source, PooledDispatchContext target) { + this.target = target; + this.source = source; + } + + public String toString() { + return "Connection from: " + source + " to " + target; + } + } + + public void addDispatcher(D dispatcher) { + + } + + public void removeDispatcher(D dispatcher) { + + } + + public void start() { + } + + public void stop() { + } + + public ExecutionTracker createExecutionTracker(PooledDispatchContext context) { + return new SimpleExecutionTracker(context); + } + + private class SimpleExecutionTracker implements ExecutionTracker { + private final HashMap, ExecutionStats> sources = new HashMap, ExecutionStats>(); + private final PooledDispatchContext context; + private final AtomicInteger work = new AtomicInteger(0); + + private PooledDispatchContext singleSource; + private IDispatcher currentOwner; + + SimpleExecutionTracker(PooledDispatchContext context) { + this.context = context; + currentOwner = context.getDispatcher(); + } + + /** + * This method is called to track which dispatch contexts are requesting + * dispatch for the target context represented by this node. + * + * This method is not threadsafe, the caller must ensure serialized + * access to this method. + * + * @param callngDispatcher + * The calling dispatcher. + * @param context + * the originating dispatch context + * @return True if this method resulted in the dispatch request being + * assigned to another dispatcher. + */ + public void onDispatchRequest(D callingDispatcher, PooledDispatchContext callingContext) { + + if (callingContext != null) { + // Make sure we are being called by another node: + if (callingContext == null || callingContext == context) { + return; + } + + // Optimize for single source case: + if (singleSource != callingContext) { + if (singleSource == null && sources.isEmpty()) { + singleSource = callingContext; + ExecutionStats stats = new ExecutionStats(callingContext, context); + sources.put(callingContext, stats); + + // If this context only has a single source + // assign it to that source to minimize contention: + if (callingDispatcher != currentOwner) { + if (DEBUG) + System.out.println("Assigning: " + context + " to " + callingContext + "'s dispatcher: " + callingDispatcher + " From: " + currentOwner); + + currentOwner = callingDispatcher; + context.assignToNewDispatcher(callingDispatcher); + } + + } else { + + ExecutionStats stats = sources.get(callingContext); + if (stats == null) { + stats = new ExecutionStats(callingContext, context); + sources.put(callingContext, stats); + } + + if (singleSource != null) { + singleSource = null; + } + } + } + work.incrementAndGet(); + } + } + + public void close() { + } + } } Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java?rev=746251&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java Fri Feb 20 14:23:26 2009 @@ -0,0 +1,71 @@ +package org.apache.activemq.dispatch; + +import java.util.LinkedList; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +public class TimerHeap { + final TreeMap> timers = new TreeMap>(); + + public final void add(Runnable runnable, long delay, TimeUnit timeUnit) { + + long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS); + long eTime = System.nanoTime() + nanoDelay; + LinkedList list = new LinkedList(); + list.add(runnable); + + LinkedList old = timers.put(eTime, list); + if (old != null) { + list.addAll(old); + } + } + + /** + * Returns the time of the next scheduled event. + * @return -1 if there are no events, otherwise the time that the next timer should fire. + */ + public final long timeToNext() { + if(timers.isEmpty()) + { + return -1; + } + else + { + return Math.max(0, timers.firstKey() - System.nanoTime()); + } + } + + /** + * Executes ready timers. + */ + public final void executeReadyTimers() { + LinkedList ready = null; + if (timers.isEmpty()) { + return; + } else { + long now = System.nanoTime(); + long first = timers.firstKey(); + if (first > now) { + return; + } + ready = new LinkedList(); + + while (first < now) { + ready.addAll(timers.remove(first)); + if (timers.isEmpty()) { + break; + } + first = timers.firstKey(); + + } + } + + for (Runnable runnable : ready) { + try { + runnable.run(); + } catch (Throwable thrown) { + thrown.printStackTrace(); + } + } + } +} Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=746251&r1=746250&r2=746251&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java Fri Feb 20 14:23:26 2009 @@ -37,14 +37,14 @@ public boolean match(Message message); } - final Router router= new Router(); - + final Router router = new Router(); + final ArrayList connections = new ArrayList(); final ArrayList producers = new ArrayList(); final ArrayList consumers = new ArrayList(); final ArrayList brokerConnections = new ArrayList(); final HashMap queues = new HashMap(); - + private TransportServer transportServer; private String uri; private String name; @@ -63,7 +63,6 @@ } } - public void addQueue(MockQueue queue) { router.bind(queue, queue.getDestination()); queues.put(queue.getDestination(), queue); @@ -111,17 +110,15 @@ } final void startServices() throws Exception { - + + dispatcher.start(); + transportServer = TransportFactory.bind(new URI(uri)); transportServer.setAcceptListener(this); - if(transportServer instanceof DispatchableTransportServer) - { - ((DispatchableTransportServer)transportServer).setDispatcher(dispatcher); + if (transportServer instanceof DispatchableTransportServer) { + ((DispatchableTransportServer) transportServer).setDispatcher(dispatcher); } transportServer.start(); - - - dispatcher.start(); for (MockQueue queue : queues.values()) { queue.start(); @@ -130,7 +127,7 @@ for (RemoteConsumer connection : consumers) { connection.start(); } - + for (RemoteProducer connection : producers) { connection.start(); } @@ -155,7 +152,7 @@ } public void onAcceptError(Exception error) { - System.out.println("Accept error: "+error); + System.out.println("Accept error: " + error); error.printStackTrace(); } @@ -186,5 +183,5 @@ public boolean isStopping() { return stopping.get(); } - + } \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=746251&r1=746250&r2=746251&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Fri Feb 20 14:23:26 2009 @@ -24,7 +24,7 @@ import junit.framework.TestCase; import org.apache.activemq.dispatch.IDispatcher; -import org.apache.activemq.dispatch.PriorityPooledDispatcher; +import org.apache.activemq.dispatch.PriorityDispatcher; import org.apache.activemq.flow.Commands.Destination; import org.apache.activemq.flow.Commands.Destination.DestinationBean; import org.apache.activemq.flow.Commands.Destination.DestinationBuffer; @@ -91,6 +91,8 @@ @Override protected void setUp() throws Exception { + dispatcher = PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize); + if( tcp ) { sendBrokerURI = "tcp://localhost:10000?wireFormat=proto"; receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto"; @@ -105,6 +107,21 @@ } } + public void test_1_1_0() throws Exception { + producerCount = 1; + destCount = 1; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + public void test_1_1_1() throws Exception { producerCount = 1; destCount = 1; @@ -340,9 +357,7 @@ private void createConnections() throws IOException, URISyntaxException { - dispatcher = new PriorityPooledDispatcher("BrokerDispatcher", asyncThreadPoolSize, Message.MAX_PRIORITY); FlowController.setFlowExecutor(dispatcher.createPriorityExecutor(Message.MAX_PRIORITY)); - if (multibroker) { sendBroker = createBroker("SendBroker", sendBrokerURI); rcvBroker = createBroker("RcvBroker", receiveBrokerURI); Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=746251&r1=746250&r2=746251&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Fri Feb 20 14:23:26 2009 @@ -50,7 +50,7 @@ if(transport instanceof DispatchableTransport) { DispatchableTransport dt = ((DispatchableTransport)transport); - dt.setName(name); + dt.setName(name + "-client-transport"); dt.setDispatcher(getDispatcher()); } super.setTransport(transport); @@ -59,7 +59,7 @@ transport.start(); // Let the remote side know our name. transport.oneway(name); - dispatchContext = getDispatcher().register(this, name + "-producer"); + dispatchContext = getDispatcher().register(this, name + "-client"); dispatchContext.requestDispatch(); }