From commits-return-10203-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Wed Feb 11 20:12:55 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 25114 invoked from network); 11 Feb 2009 20:12:55 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Feb 2009 20:12:55 -0000 Received: (qmail 27745 invoked by uid 500); 11 Feb 2009 20:12:55 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 27687 invoked by uid 500); 11 Feb 2009 20:12:55 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 27678 invoked by uid 99); 11 Feb 2009 20:12:54 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2009 12:12:54 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2009 20:12:52 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 906E82388AE7; Wed, 11 Feb 2009 20:12:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r743476 [1/4] - in /activemq/sandbox/activemq-flow: ./ src/ src/main/ src/main/java/ src/main/java/com/ src/main/java/com/progress/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/act... Date: Wed, 11 Feb 2009 20:12:30 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090211201232.906E82388AE7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Wed Feb 11 20:12:28 2009 New Revision: 743476 URL: http://svn.apache.org/viewvc?rev=743476&view=rev Log: Initial spike of the flow module. This is basically an experiment of a different flow control and threading model to see if we can make a faster/simpler version of the broker. This code was jointly developed between Colin Macnaughton (yes, he has an icla on file) and me. Will follow up with a post to the dev list to try to entice more eyes to look at this to see if it sparks any interest. Added: activemq/sandbox/activemq-flow/pom.xml activemq/sandbox/activemq-flow/src/ activemq/sandbox/activemq-flow/src/main/ activemq/sandbox/activemq-flow/src/main/java/ activemq/sandbox/activemq-flow/src/main/java/com/ activemq/sandbox/activemq-flow/src/main/java/com/progress/ activemq/sandbox/activemq-flow/src/main/java/org/ activemq/sandbox/activemq-flow/src/main/java/org/apache/ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java activemq/sandbox/activemq-flow/src/test/ activemq/sandbox/activemq-flow/src/test/java/ activemq/sandbox/activemq-flow/src/test/java/com/ activemq/sandbox/activemq-flow/src/test/java/com/progress/ activemq/sandbox/activemq-flow/src/test/java/org/ activemq/sandbox/activemq-flow/src/test/java/org/apache/ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MessageGenerator.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java Added: activemq/sandbox/activemq-flow/pom.xml URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/pom.xml (added) +++ activemq/sandbox/activemq-flow/pom.xml Wed Feb 11 20:12:28 2009 @@ -0,0 +1,52 @@ + + + + + 4.0.0 + + + org.apache.activemq + activemq-parent + 5.3-SNAPSHOT + + + org.apache.activemq.flow + activemq-flow + jar + 1.0-SNAPSHOT + + ActiveMQ :: Flow + + + + junit + junit + + + colt + colt + 1.2.0 + compile + + + org.apache.activemq + activemq-core + + + + Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.dispatch; + +import org.apache.activemq.dispatch.IDispatcher.DispatchContext; + +public interface ExecutionLoadBalancer { + + /** + * A Load Balanced Dispatch context can be moved between different + * dispatchers. + */ + public interface LoadBalancedDispatchContext extends DispatchContext { + /** + * A dispatcher must call this when it starts dispatch for this context + */ + public void startingDispatch(); + + /** + * A dispatcher must call this when it has finished dispatching a + * context + */ + public void finishedDispatch(); + + /** + * + */ + public void processForeignUpdates(); + } + + public interface PoolableDispatchContext extends DispatchContext { + + public void setLoadBalancedDispatchContext(LoadBalancedDispatchContext context); + + /** + * Indicates that another thread has made an update to the dispatch + * context. + * + */ + public void onForeignThreadUpdate(); + + public PoolableDispatcher getDispatcher(); + } + + public interface PoolableDispatcher extends IDispatcher { + + /** + * Indicates that another thread has made an update to the dispatch + * context. + * + */ + public PoolableDispatchContext createPoolablDispatchContext(Dispatchable dispatchable, String name); + } + + /** + * This wraps the dispatch context into one that is load balanced by the + * LoadBalancer + * + * @param context + * The context to wrap. + * @return + */ + public LoadBalancedDispatchContext createLoadBalancedDispatchContext(PoolableDispatchContext context); + + /** + * Adds a Dispatcher to the list of dispatchers managed by the load balancer + * + * @param dispatcher + */ + public void addDispatcher(PoolableDispatcher dispatcher); + + /** + * A Dispatcher must call this from it's dispatcher thread to indicate that + * is has started it's dispatch has started. + */ + public void onDispatcherStarted(PoolableDispatcher dispatcher); + + /** + * A Dispatcher must call this from it's dispatcher thread when exiting it's + * dispatch loop + */ + public void onDispatcherStopped(PoolableDispatcher dispatcher); +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.dispatch; + +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +public interface IDispatcher { + + /** + * This interface is implemented by Dispatchable entities. A Dispatchable + * entity registers with an {@link IDispatcher} and is returned a + * {@link DispatchContext} which it can use to request the + * {@link IDispatcher} to invoke {@link Dispatchable#dispatch()} + * + * {@link IDispatcher} guarantees that {@link #dispatch()} will never invoke + * dispatch concurrently unless the {@link Dispatchable} is registered with + * more than one {@link IDispatcher}; + */ + public interface Dispatchable { + public boolean dispatch(); + } + + /** + * Returned to callers registered with this dispathcer. Used by the caller + * to inform the dispatcher that it is ready for dispatch. + * + * Note that DispatchContext is not safe for concurrent access by multiple + * threads. + */ + public interface DispatchContext { + /** + * Once registered with a dispatcher, this can be called to request + * dispatch. The {@link Dispatchable} will remain in the dispatch queue + * until a subsequent call to {@link Dispatchable#dispatch()} returns + * false; + */ + public void requestDispatch(); + + /** + * This can be called to update the dispatch priority. + * + * @param priority + */ + public void updatePriority(int priority); + + /** + * Gets the Dispatchable that this context represents. + * + * @return The dispatchable + */ + public Dispatchable getDispatchable(); + + /** + * Gets the name of the dispatch context + * + * @return The dispatchable + */ + public String getName(); + + /** + * This must be called to release any resource the dispatcher is holding + * on behalf of this context. + */ + public void close(); + } + + class RunnableAdapter implements Dispatchable { + final Runnable runnable; + + RunnableAdapter(Runnable runnable) { + this.runnable = runnable; + } + + public boolean dispatch() { + runnable.run(); + return true; + } + } + + /** + * Registers a {@link Dispatchable} with this dispatcher, and returns a + * {@link DispatchContext} that the caller can use to request dispatch. + * + * @param dispatchable + * The {@link Dispatchable} + * @param name + * An identifier for the dispatcher. + * @return A {@link DispatchContext} that can be used to request dispatch + */ + public DispatchContext register(Dispatchable dispatchable, String name); + + /** + * Creates an executor that will execute its tasks at the specified + * priority. + * + * @param priority + * The priority + * @return A prioritized executor. + */ + public Executor createPriorityExecutor(int priority); + + /** + * Starts the dispatcher. + */ + public void start(); + + /** + * Shuts down the dispatcher, this may result in previous dispatch requests + * going unserved. + */ + public void shutdown() throws InterruptedException; + + /** + * Schedules the given {@link Runnable} to be run at the specified time in + * the future on this {@link IDispatcher}. + * + * @param runnable + * The Runnable to execute + * @param delay + * The delay + * @param timeUnit + * The TimeUnit used to interpret delay. + */ + public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit); + +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,453 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.dispatch; + +import java.util.LinkedList; +import java.util.TreeMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.dispatch.ExecutionLoadBalancer.LoadBalancedDispatchContext; +import org.apache.activemq.dispatch.ExecutionLoadBalancer.PoolableDispatchContext; +import org.apache.activemq.dispatch.ExecutionLoadBalancer.PoolableDispatcher; +import org.apache.activemq.queue.Mapper; +import org.apache.kahadb.util.LinkedNode; +import org.apache.kahadb.util.LinkedNodeList; + +public class PriorityDispatcher implements Runnable, PoolableDispatcher { + + private Thread thread; + private boolean running = false; + private boolean threaded = false; + private final int MAX_USER_PRIORITY; + + static final ThreadLocal dispatcher = new ThreadLocal(); + + private final ExecutionLoadBalancer loadBalancer; + + // The local dispatch queue: + private final PriorityLinkedList priorityQueue; + + // Dispatch queue for requests from other threads: + private final LinkedNodeList foreignQueue = new LinkedNodeList(); + + // Timed Execution List + private final TimerHeap timerHeap = new TimerHeap(); + + private final String name; + private final AtomicBoolean foreignAvailable = new AtomicBoolean(false); + private final Semaphore foreignPermits = new Semaphore(0); + + private final Mapper PRIORITY_MAPPER = new Mapper() { + public Integer map(PriorityDispatchContext element) { + return element.listPrio; + } + }; + + public PriorityDispatcher(String name, int priorities, ExecutionLoadBalancer loadBalancer) { + this.name = name; + MAX_USER_PRIORITY = priorities; + priorityQueue = new PriorityLinkedList(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER); + this.loadBalancer = loadBalancer; + loadBalancer.addDispatcher(this); + } + + private abstract class ForeignEvent extends LinkedNode { + public abstract void execute(); + + final void addToList() { + synchronized (foreignQueue) { + if (!this.isLinked()) { + foreignQueue.addLast(this); + if (!foreignAvailable.getAndSet(true)) { + foreignPermits.release(); + } + } + } + } + } + + public boolean isThreaded() { + return threaded; + } + + public void setThreaded(boolean threaded) { + this.threaded = threaded; + } + + private class UpdateEvent extends ForeignEvent { + private final PriorityDispatchContext pdc; + + UpdateEvent(PriorityDispatchContext pdc) { + this.pdc = pdc; + } + + // Can only be called by the owner of this dispatch context: + public void execute() { + pdc.lbContext.processForeignUpdates(); + } + } + + class PriorityDispatchContext extends LinkedNode implements PoolableDispatchContext { + // The dispatchable target: + final Dispatchable dispatchable; + LoadBalancedDispatchContext lbContext; + // The name of this context: + final String name; + // list prio can only be updated in the thread of of this dispatcher: + int listPrio; + // The update event is used to update fields in the dispatch context + // from foreign threads: + final UpdateEvent updateEvent = new UpdateEvent(this); + + // Marked by the caller when this is closed. + boolean closed = false; + + private PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) { + super(); + this.dispatchable = dispatchable; + this.name = name; + } + + // The load balancer will guarantee that this is on our thread: + public final void requestDispatch() { + if (!isLinked()) { + priorityQueue.add(this, listPrio); + } + return; + } + + // The load balancer guarantees that this is called on our thread: + public final void updatePriority(int priority) { + if (priority != listPrio) { + + listPrio = priority; + // If there is a priority change relink the context + // at the new priority: + if (isLinked()) { + unlink(); + priorityQueue.add(this, listPrio); + } + } + return; + + } + + public void onForeignThreadUpdate() { + updateEvent.addToList(); + } + + // Will only be called on this thread: + public void close() { + if (isLinked()) { + unlink(); + } + synchronized (foreignQueue) { + if (updateEvent.isLinked()) { + updateEvent.unlink(); + } + } + + closed = true; + } + + /** + * This can only be called by the owning dispatch thread: + * + * @return False if the dispatchable has more work to do. + */ + public final boolean dispatch() { + return dispatchable.dispatch(); + } + + public String toString() { + return name; + } + + public Dispatchable getDispatchable() { + return dispatchable; + } + + public void setLoadBalancedDispatchContext(LoadBalancedDispatchContext context) { + this.lbContext = context; + } + + public String getName() { + return name; + } + + public PoolableDispatcher getDispatcher() { + return PriorityDispatcher.this; + } + } + + public DispatchContext register(Dispatchable dispatchable, String name) { + return loadBalancer.createLoadBalancedDispatchContext(createPoolablDispatchContext(dispatchable, name)); + } + + public PoolableDispatchContext createPoolablDispatchContext(Dispatchable dispatchable, String name) { + return new PriorityDispatchContext(dispatchable, true, name); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.dispatch.IDispatcher#start() + */ + public synchronized final void start() { + if (thread == null) { + running = true; + thread = new Thread(this, name); + thread.start(); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.dispatch.IDispatcher#shutdown() + */ + public synchronized final void shutdown() throws InterruptedException { + if (thread != null) { + dispatch(new RunnableAdapter(new Runnable() { + + public void run() { + running = false; + } + + }), MAX_USER_PRIORITY + 1); + thread.interrupt(); + thread.join(); + thread = null; + } + } + + public void run() { + + // Inform the dispatcher that we have started: + loadBalancer.onDispatcherStarted(this); + dispatcher.set(this); + PriorityDispatchContext pdc; + try { + while (running) { + pdc = priorityQueue.poll(); + // If no local work available wait for foreign work: + if (pdc == null) { + foreignPermits.acquire(); + } else { + pdc.lbContext.startingDispatch(); + + while (!pdc.dispatch()) { + // If there is a higher priority dispatchable stop + // processing this one: + if (pdc.listPrio < priorityQueue.getHighestPriority()) { + // May have gotten relinked by the caller: + if (!pdc.isLinked()) { + priorityQueue.add(pdc, pdc.listPrio); + } + break; + } + } + + pdc.lbContext.finishedDispatch(); + + } + + // Execute delayed events: + timerHeap.executeReadyEvents(); + + // Check for foreign dispatch requests: + if (foreignAvailable.get()) { + synchronized (foreignQueue) { + // Drain the foreign queue: + while (true) { + ForeignEvent fe = foreignQueue.getHead(); + // TODO should probably swap foreign queue here: + if (fe == null) { + foreignAvailable.set(false); + foreignPermits.drainPermits(); + break; + } + + fe.unlink(); + fe.execute(); + } + } + } + } + } catch (InterruptedException e) { + return; + } catch (Throwable thrown) { + thrown.printStackTrace(); + } finally { + loadBalancer.onDispatcherStopped(this); + } + } + + class ThreadSafeDispatchContext implements LoadBalancedDispatchContext { + final PriorityDispatchContext delegate; + + ThreadSafeDispatchContext(PriorityDispatchContext context) { + this.delegate = context; + delegate.setLoadBalancedDispatchContext(this); + } + + public void finishedDispatch() { + // NOOP + + } + + public void startingDispatch() { + // Noop + + } + + public void close() { + // Noop this is always transient: + } + + public void processForeignUpdates() { + requestDispatch(); + } + + public Dispatchable getDispatchable() { + return delegate.getDispatchable(); + } + + public void requestDispatch() { + if (dispatcher.get() == PriorityDispatcher.this) { + delegate.requestDispatch(); + } else { + delegate.onForeignThreadUpdate(); + } + } + + public void updatePriority(int priority) { + throw new UnsupportedOperationException("Not implemented"); + } + + public String getName() { + return delegate.name; + } + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq + * .dispatch.Dispatcher.Dispatchable) + */ + final void dispatch(Dispatchable dispatchable, int priority) { + ThreadSafeDispatchContext context = new ThreadSafeDispatchContext(new PriorityDispatchContext(dispatchable, false, name)); + context.delegate.updatePriority(priority); + context.requestDispatch(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int) + */ + public Executor createPriorityExecutor(final int priority) { + + return new Executor() { + + public void execute(final Runnable runnable) { + dispatch(new RunnableAdapter(runnable), priority); + } + }; + } + + public void execute(final Runnable runnable) { + dispatch(new RunnableAdapter(runnable), 0); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable, + * long, java.util.concurrent.TimeUnit) + */ + public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) { + if (dispatcher.get() == this) { + timerHeap.add(runnable, delay, timeUnit); + } else { + new ForeignEvent() { + public void execute() { + timerHeap.add(runnable, delay, timeUnit); + } + }.addToList(); + } + } + + public String toString() { + return name; + } + + private class TimerHeap { + + final TreeMap> timers = new TreeMap>(); + + private void add(Runnable runnable, long delay, TimeUnit timeUnit) { + + long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS); + long eTime = System.nanoTime() + nanoDelay; + LinkedList list = new LinkedList(); + list.add(runnable); + + LinkedList old = timers.put(eTime, list); + if (old != null) { + list.addAll(old); + } + } + + private void executeReadyEvents() { + LinkedList ready = null; + if (timers.isEmpty()) { + return; + } else { + long now = System.nanoTime(); + long first = timers.firstKey(); + if (first > now) { + return; + } + ready = new LinkedList(); + + while (first < now) { + ready.addAll(timers.remove(first)); + if (timers.isEmpty()) { + break; + } + first = timers.firstKey(); + + } + } + + for (Runnable runnable : ready) { + try { + runnable.run(); + } catch (Throwable thrown) { + thrown.printStackTrace(); + } + } + } + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.dispatch; + +import java.util.ArrayList; + +import org.apache.activemq.queue.Mapper; +import org.apache.kahadb.util.LinkedNode; +import org.apache.kahadb.util.LinkedNodeList; + +public class PriorityLinkedList> { + + private Mapper priorityMapper; + private final ArrayList> priorityLists; + private int highesPriority = 0; + + public PriorityLinkedList(int numPriorities) { + this(numPriorities, null); + } + + public PriorityLinkedList(int numPriorities, Mapper priorityMapper) { + this.priorityMapper = priorityMapper; + priorityLists = new ArrayList>(); + for (int i = 0; i <= numPriorities; i++) { + priorityLists.add(new LinkedNodeList()); + } + } + + public final int getHighestPriority() { + return highesPriority; + } + + /** + * Gets the element at the front of the list: + * + * @return + */ + public final E poll() { + LinkedNodeList ll = getHighestPriorityList(); + if (ll == null) { + return null; + } + E node = ll.getHead(); + node.unlink(); + + return node; + } + + public final boolean isEmpty() { + return peek() != null; + } + + /** + * Gets the element at the front of the list: + * + * @return + */ + public final E peek() { + LinkedNodeList ll = getHighestPriorityList(); + if (ll == null) { + return null; + } + + return ll.getHead(); + } + + public final void add(E element) { + int prio = priorityMapper.map(element); + add(element, prio); + } + + public final void add(E element, int prio) { + LinkedNodeList ll = priorityLists.get(prio); + ll.addLast(element); + if (prio > highesPriority) { + highesPriority = prio; + } + } + + private final LinkedNodeList getHighestPriorityList() { + LinkedNodeList ll = priorityLists.get(highesPriority); + while (ll.isEmpty()) { + if (highesPriority == 0) { + return null; + } + highesPriority--; + ll = priorityLists.get(highesPriority); + } + + return ll; + } + + public Mapper getPriorityMapper() { + return priorityMapper; + } + + public void setPriorityMapper(Mapper priorityMapper) { + this.priorityMapper = priorityMapper; + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.dispatch; + +import java.util.Arrays; + +public class PriorityMap { + + int first; + int base; + int size; + + Object elements[] = new Object[1]; + + public E put(int key, E value) { + E rc = null; + if (isEmpty()) { + // This will be the first base prioritu.. + base = key; + elements[0] = value; + first = 0; + } else { + if (key > base) { + // New priority is after the current base, we may need to + // expaned the + // priority array to fit this new one in. + int index = key - base; + if (elements.length <= index) { + // The funky thing is if the original base was removed, + // resizing + // will rebase the at the first. + resize(index + 1, 0); + } + if (index < first) { + first = index; + } + rc = element(index); + elements[index] = value; + } else { + // Ok this element is before the current base so we need to + // resize/rebase + // using this element as the base. + int oldLastIndex = indexOfLast(); + int newLastIndex = (base + oldLastIndex) - key; + resize(newLastIndex + 1, first + (base - key), (oldLastIndex - first) + 1); + elements[0] = value; + first = 0; + } + } + if (rc == null) { + size++; + } + return rc; + } + + private int indexOfLast() { + int i = elements.length - 1; + while (i >= 0) { + if (elements[i] != null) { + return i; + } + i--; + } + return -1; + } + + private void resize(int newSize, int firstOffset) { + int count = Math.min(elements.length - first, newSize); + resize(newSize, firstOffset, count); + } + + private void resize(int newSize, int firstOffset, int copyCount) { + Object t[]; + if (elements.length == newSize) { + t = elements; + System.arraycopy(elements, first, t, firstOffset, copyCount); + Arrays.fill(t, 0, firstOffset, null); + } else { + t = new Object[newSize]; + System.arraycopy(elements, first, t, firstOffset, copyCount); + } + base += (first - firstOffset); + elements = t; + } + + public E get(int priority) { + int index = priority - base; + if (index < 0 || index >= elements.length) { + return null; + } + return element(index); + } + + @SuppressWarnings("unchecked") + private E element(int index) { + return (E) elements[index]; + } + + public E remove(int priority) { + int index = priority - base; + if (index < 0 || index >= elements.length) { + return null; + } + E rc = element(index); + elements[index] = null; + if (rc != null) { + size--; + } + return rc; + } + + public boolean isEmpty() { + return size == 0; + } + + public E firstValue() { + if (size == 0) { + return null; + } + E rc = element(first); + while (rc == null) { + // The first element may have been removed so we need to find it... + first++; + rc = element(first); + } + return (E) rc; + } + + public Integer firstKey() { + if (size == 0) { + return null; + } + E rc = element(first); + while (rc == null) { + // The first element may have been removed so we need to find it... + first++; + rc = element(first); + } + return first; + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.dispatch; + +import java.util.ArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class PriorityPooledDispatcher implements IDispatcher { + private final String name; + + final AtomicBoolean started = new AtomicBoolean(); + final AtomicBoolean shutdown = new AtomicBoolean(); + + ArrayList dispatchers = new ArrayList(); + private int roundRobinCounter = 0; + private final int size; + + private final SimpleLoadBalancer executionGraphLoadBalancer; + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int) + */ + public Executor createPriorityExecutor(final int priority) { + return new Executor() { + public void execute(final Runnable runnable) { + chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0); + } + }; + } + + public PriorityPooledDispatcher(String name, int size, int priorities) { + this.name = name; + this.size = size; + executionGraphLoadBalancer = new SimpleLoadBalancer(name); + // Create all the workers. + for (int i = 0; i < size; i++) { + PriorityDispatcher dispatcher = new PriorityDispatcher(name + "-" + (i + 1), priorities, executionGraphLoadBalancer); + dispatchers.add(dispatcher); + } + } + + public DispatchContext register(Dispatchable dispatchable, String name) { + return chooseDispatcher().register(dispatchable, name); + } + + /** + * @see org.apache.activemq.dispatch.IDispatcher#start() + */ + public synchronized final void start() { + if (started.compareAndSet(false, true)) { + // Create all the workers. + for (int i = 0; i < size; i++) { + dispatchers.get(i).start(); + } + } + try { + executionGraphLoadBalancer.start(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.dispatch.IDispatcher#shutdown() + */ + public synchronized final void shutdown() throws InterruptedException { + shutdown.set(true); + for (PriorityDispatcher dispatcher : dispatchers) { + dispatcher.shutdown(); + } + executionGraphLoadBalancer.shutdown(); + } + + private PriorityDispatcher chooseDispatcher() { + PriorityDispatcher d = PriorityDispatcher.dispatcher.get(); + if (d == null) { + synchronized (dispatchers) { + if (++roundRobinCounter >= size) { + roundRobinCounter = 0; + } + return dispatchers.get(roundRobinCounter); + } + } else { + return d; + } + } + + public void execute(final Runnable runnable) { + chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0); + } + + // TODO Implement + /* + * (non-Javadoc) + * + * @see + * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable, + * long, java.util.concurrent.TimeUnit) + */ + public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) { + chooseDispatcher().schedule(runnable, delay, timeUnit); + } + + public String toString() { + return name; + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.dispatch; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.dispatch.IDispatcher.Dispatchable; + +/** + * + */ +public class SimpleLoadBalancer implements ExecutionLoadBalancer { + + private static final ThreadLocal dispatchContext = new ThreadLocal(); + private static final ThreadLocal dispatcher = new ThreadLocal(); + + private final ArrayList dispatchers = new ArrayList(); + + private final String name; + private final boolean DEBUG = false; + + SimpleLoadBalancer(String name) { + this.name = name; + } + + public void start() throws InterruptedException { + } + + public void shutdown() { + } + + public LoadBalancedDispatchContext createLoadBalancedDispatchContext(PoolableDispatchContext context) { + ExecutionGraphNode egn = new ExecutionGraphNode(context); + return egn; + } + + public synchronized final void addDispatcher(PoolableDispatcher dispatcher) { + dispatchers.add(dispatcher); + } + + /** + * A Dispatcher must call this to indicate that is has started it's dispatch + * loop. + */ + public void onDispatcherStarted(PoolableDispatcher d) { + dispatcher.set(d); + } + + /** + * A Dispatcher must call this when exiting it's dispatch loop + */ + public void onDispatcherStopped(PoolableDispatcher d) { + + } + + private class ExecutionGraphEdge { + final ExecutionGraphNode target; + final ExecutionGraphNode source; + int count; + + ExecutionGraphEdge(ExecutionGraphNode source, ExecutionGraphNode target) { + this.target = target; + this.source = source; + } + + public String toString() { + return "Connection from: " + source + " to " + target; + } + } + + /** + * ExecutionGraphNode tracks dispatch information for a + * MappableDispatchContext. + * + */ + public class ExecutionGraphNode implements LoadBalancedDispatchContext { + protected PoolableDispatchContext context; + private ExecutionGraphNode singleSource; + private final HashMap sources = new HashMap(); + protected PoolableDispatcher currentOwner; + private final AtomicInteger work = new AtomicInteger(0); + + private int priority; + private boolean dispatchRequested = false; + private PoolableDispatcher updateDispatcher = null; + + ExecutionGraphNode(PoolableDispatchContext context) { + this.context = context; + this.context.setLoadBalancedDispatchContext(this); + this.currentOwner = context.getDispatcher(); + if (DEBUG) { + System.out.println(getName() + " Assigned to " + context.getDispatcher()); + } + } + + public final void startingDispatch() { + dispatchContext.set(this); + } + + public final void finishedDispatch() { + dispatchContext.set(null); + } + + /** + * This method is called to track which dispatch contexts are requesting + * dispatch for the target context represented by this node. + * + * This method is not threadsafe, the caller must ensure serialized + * access to this method. + * + * @param callingDispatcher + * The calling dispatcher. + * @return True if this method resulted in the dispatch request being + * assigned to another dispatcher. + */ + public final boolean onDispatchRequest(final PoolableDispatcher callingDispatcher) { + + /* + * if (callingDispatcher == currentOwner) { return false; } + */ + + ExecutionGraphNode callingContext = dispatchContext.get(); + if (callingContext != null) { + // Make sure we are being called by another node: + if (callingContext == null || callingContext == context) { + return false; + } + + // Optimize for single source case: + if (singleSource != callingContext) { + if (singleSource == null && sources.isEmpty()) { + singleSource = callingContext; + ExecutionGraphEdge edge = new ExecutionGraphEdge(callingContext, this); + sources.put(callingContext, edge); + + // If this context only has a single source + // immediately assign it to the + // dispatcher of the source: + boolean reassigned = false; + synchronized (this) { + if (callingDispatcher != currentOwner && updateDispatcher == null) { + updateDispatcher = callingDispatcher; + reassigned = true; + if (DEBUG) + System.out.println("Assigning: " + this + " to " + callingContext + "'s dispatcher: " + callingDispatcher); + + } + } + if (reassigned) { + assignToNewDispatcher(callingDispatcher); + } + return true; + } else { + + ExecutionGraphEdge stats = sources.get(callingContext); + if (stats == null) { + stats = new ExecutionGraphEdge(callingContext, this); + sources.put(callingContext, stats); + } + + if (singleSource != null) { + singleSource = null; + } + } + } + work.incrementAndGet(); + } + return false; + } + + final void assignToNewDispatcher(PoolableDispatcher newDispatcher) { + synchronized (this) { + if (newDispatcher != currentOwner) { + updateDispatcher = newDispatcher; + } + } + context.onForeignThreadUpdate(); + } + + public void requestDispatch() { + + PoolableDispatcher callingDispatcher = dispatcher.get(); + + if (onDispatchRequest(callingDispatcher)) { + return; + } + + // Otherwise this is coming off another thread, so we need to + // synchronize + // to protect against ownership changes: + synchronized (this) { + // If the owner of this context is the calling thread, then + // delegate to the dispatcher. + if (currentOwner == callingDispatcher) { + + context.requestDispatch(); + return; + } + + dispatchRequested = true; + } + context.onForeignThreadUpdate(); + } + + public void updatePriority(int priority) { + if (this.priority == priority) { + return; + } + // Otherwise this is coming off another thread, so we need to + // synchronize + // to protect against ownership changes: + synchronized (this) { + this.priority = priority; + + IDispatcher callingDispatcher = dispatcher.get(); + + // If the owner of this context is the calling thread, then + // delegate to the dispatcher. + if (currentOwner == callingDispatcher) { + + context.updatePriority(priority); + return; + } + } + context.onForeignThreadUpdate(); + } + + public void processForeignUpdates() { + boolean ownerChange = false; + synchronized (this) { + if (updateDispatcher != null) { + // Close the old context: + if (DEBUG) { + System.out.println("Assigning " + getName() + " to " + updateDispatcher); + } + context.close(); + + currentOwner = updateDispatcher; + updateDispatcher = null; + context = currentOwner.createPoolablDispatchContext(context.getDispatchable(), context.getName()); + dispatchRequested = true; + context.updatePriority(priority); + context.setLoadBalancedDispatchContext(this); + ownerChange = true; + } else { + context.updatePriority(priority); + + if (dispatchRequested) { + context.requestDispatch(); + dispatchRequested = false; + } + } + } + + if (ownerChange) { + context.onForeignThreadUpdate(); + } + } + + public void close() { + sources.clear(); + } + + public final String toString() { + return context.toString(); + } + + public Dispatchable getDispatchable() { + return context.getDispatchable(); + } + + public String getName() { + return context.getName(); + } + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +import java.util.HashMap; +import java.util.HashSet; + +public abstract class AbstractLimitedFlowResource implements IFlowResource { + private final HashSet lifeCycleWatchers = new HashSet(); + private final HashMap> openControllers = new HashMap>(); + + private final long resourceId = RESOURCE_COUNTER.incrementAndGet(); + + private String resourceName; + + protected AbstractLimitedFlowResource() { + + } + + protected AbstractLimitedFlowResource(String name) { + this.resourceName = name; + } + + public long getResourceId() { + return resourceId; + } + + public String getResourceName() { + return resourceName; + } + + protected void setResourceName(String resourceName) { + this.resourceName = resourceName; + } + + public synchronized final void addFlowLifeCycleListener(FlowLifeCycleListener listener) { + lifeCycleWatchers.add(listener); + // Notify the watchers of all flows that are already open: + for (FlowController controller : openControllers.values()) { + listener.onFlowOpened(this, controller.getFlow()); + } + } + + /** + * Subclasses must call this whenever a new {@link ISinkController} is + * opened. + * + * @param controller + * The new controller. + */ + protected synchronized final void onFlowOpened(FlowController controller) { + FlowController existing = openControllers.put(controller.getFlow(), controller); + if (existing != null && existing != controller) { + // Put the existing controller back: + openControllers.put(controller.getFlow(), existing); + throw new IllegalStateException("Flow already opened" + existing); + } + + for (FlowLifeCycleListener listener : lifeCycleWatchers) { + listener.onFlowOpened(this, controller.getFlow()); + } + } + + protected synchronized final void onFlowClosed(Flow flow) { + FlowController existing = openControllers.remove(flow); + + if (existing != null) { + for (FlowLifeCycleListener listener : lifeCycleWatchers) { + listener.onFlowClosed(this, existing.getFlow()); + } + } + } + + public synchronized void removeFlowLifeCycleListener(FlowLifeCycleListener listener) { + lifeCycleWatchers.remove(listener); + } + + /** + * Gets the flow controller corresponding to the specified flow. + * + * @param flow + * The flow + * @return The FlowController + */ + public synchronized FlowController getFlowController(Flow flow) { + return openControllers.get(flow); + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +public abstract class AbstractLimitedFlowSource extends AbstractLimitedFlowResource { + + protected boolean autoRelease = false; + protected IFlowDrain drain; + + protected AbstractLimitedFlowSource() { + super(); + } + + protected AbstractLimitedFlowSource(String name) { + super(name); + } + + /** + * Can be set to automatically release space on dequeue. When set the caller + * does not need to call IFlowController.elementDispatched() + * + * @param val + */ + public synchronized void setAutoRelease(boolean val) { + autoRelease = val; + } + + /** + * Returns whether or not this {@link IFlowSource} is set to automatically + * release elements via {@link FlowController#elementDispatched(Object) + * during dispatch. When auto release is set the caller must not call + * {@link FlowController#elementDispatched(Object). + */ + public synchronized boolean getAutoRelease() { + return autoRelease; + } + + public synchronized void setDrain(IFlowDrain drain) { + + this.drain = drain; + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +import java.util.LinkedList; + +public abstract class AbstractLimiter implements IFlowLimiter { + + private LinkedList throttleListeners = new LinkedList(); + private boolean resuming; + + public AbstractLimiter() { + } + + public final void addUnThrottleListener(UnThrottleListener l) { + throttleListeners.add(l); + + if (!resuming && !getThrottled()) { + notifyUnThrottleListeners(); + } + } + + public final void notifyUnThrottleListeners() { + resuming = true; + while (!getThrottled() && !throttleListeners.isEmpty()) { + UnThrottleListener l = throttleListeners.remove(); + l.onUnthrottled(); + } + resuming = false; + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +import java.util.concurrent.atomic.AtomicLong; + +/** + */ +final public class Flow { + static final private AtomicLong FLOW_COUNTER = new AtomicLong(); + + final private long flowID; + final private String flowName; + final private boolean dynamic; + final private int hashCode; + + /** + * Package scoped constructor. + * + * @param name + * The flow name. + * @param id + * The flow id. + */ + public Flow(String name, boolean dynamic) { + this.flowID = FLOW_COUNTER.incrementAndGet(); + this.flowName = name; + this.dynamic = dynamic; + this.hashCode = (int) (flowID ^ (flowID >>> 32)); + } + + /** + * @see org.apache.activemq.flow.Flow#getFlowID() + */ + public long getFlowID() { + return flowID; + } + + /** + * @see Flow#getFlowName() + */ + public String getFlowName() { + return flowName; + } + + /** + * @see Flow#isDynamic() + */ + public boolean isDynamic() { + return dynamic; + } + + public int hashCode() { + return hashCode; + } + + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o instanceof Flow) { + return equals((Flow) o); + } + + return false; + } + + public boolean equals(Flow flow) { + return flowID == flow.getFlowID(); + } + + public String toString() { + return getFlowName(); + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,440 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import org.apache.activemq.flow.IFlowLimiter.UnThrottleListener; + +/** + */ +public class FlowController implements ISinkController, ISourceController { + + // Sinks that are blocking us. + private final HashSet> blockingSinks = new HashSet>(); + + // Holds the sources that this limiter is currently blocking + private final HashSet> blockedSources = new HashSet>(); + + // Holds the sources that this limiter is currently blocking + private final HashSet> unblockListeners = new HashSet>(); + + // Callback for the IFlowLimiter to notify us that it is unthrottled: + private final UnThrottleListener unthrottleListener; + + // The flow being flow controlled: + protected Flow flow; + + // Used to synchronize access to this controller by downstream sinks: + // private final ReentrantLock sinkLock = new + // java.util.concurrent.locks.ReentrantLock(); + + // True if the flow is blocked: + protected boolean blocked = false; + + // Set to true while resuming. New Sources must wait to enqueue while + // old sources are being resumed. + private boolean resuming = false; + + // Marks that we have scheduled a resume + private boolean resumeScheduled = false; + + // The acceptor for elements from this flow. + private FlowControllable controllable; + + // The limiter + private IFlowLimiter limiter; + + // Mutex for synchronization + private Object mutex; + + private boolean useOverFlowQueue = true; + + // List of elements that were added while the flow is blocked. + // These aren't added to the resource until the flow becomes + // unblocked. + private LinkedList overflowQueue = new LinkedList(); + + // true we registered as an unthrottle listener: + private boolean throttleReg; + private boolean notifyUnblock = false; + private String name; + + public FlowController() { + this.unthrottleListener = new UnThrottleListener() { + public final void onUnthrottled() { + FlowController.this.onUnthrottled(); + } + }; + } + + public FlowController(FlowControllable controllable, Flow flow, IFlowLimiter limiter, Object mutex) { + this(); + this.controllable = controllable; + this.flow = flow; + this.limiter = limiter == null ? new SizeLimiter(0, 0) : limiter; + this.mutex = mutex; + this.name = controllable.getFlowSource().toString(); + } + + public final IFlowLimiter getLimiter() { + return limiter; + } + + public final void setLimiter(IFlowLimiter limiter) { + synchronized (mutex) { + this.limiter = limiter; + onUnthrottled(); + } + } + + /** + * Sets whether the controller uses an overflow queue to prevent overflow. + */ + public final void useOverFlowQueue(boolean val) { + useOverFlowQueue = val; + } + + public final Flow getFlow() { + return flow; + } + + /** + * Should be called by a resource anytime it's limits are exceeded. + */ + public final void onFlowBlock(ISinkController sinkController) { + synchronized (mutex) { + if (!blockingSinks.add(sinkController)) { + throw new IllegalStateException(sinkController + " has already blocked: " + this); + } + + if (!blocked) { + blocked = true; + // System.out.println(this + " BLOCKED"); + } + } + } + + public final void onFlowResume(ISinkController sinkController) { + synchronized (mutex) { + if (!blockingSinks.remove(sinkController)) { + throw new IllegalStateException(sinkController + " can't resume unblocked " + this); + } + + if (blockingSinks.isEmpty()) { + if (blocked) { + blocked = false; + limiter.releaseReserved(); + } + } + } + } + + /** + * Must be called once the elements have been sent to downstream sinks. + * + * @param elem + * The dispatched element. + * @return + */ + public final void elementDispatched(E elem) { + + synchronized (mutex) { + // If we were blocked in the course of dispatching the message don't + // decrement + // the limiter space: + if (blocked) { + limiter.reserve(elem); + return; + } + limiter.remove(elem); + } + } + + public final boolean isSinkBlocked() { + synchronized (mutex) { + return limiter.getThrottled(); + } + } + + public final boolean isSourceBlocked() { + synchronized (mutex) { + return blocked; + } + } + + /** + * Waits for a flow to become unblocked. + * + * @param flow + * The flow. + * @throws InterruptedException + * If interrupted while waiting. + */ + public void waitForFlowUnblock() throws InterruptedException { + synchronized (mutex) { + while (limiter.getThrottled()) { + notifyUnblock = true; + setUnThrottleListener(); + mutex.wait(); + } + notifyUnblock = false; + } + } + + public IFlowSource getFlowSource() { + return controllable.getFlowSource(); + } + + /** + * Adds an element to the sink associated with this resource if space is + * available. If no space is available the source controller will be + * blocked, and the source is responsible for tracking the space until this + * controller resumes. + * + * @param elem + * The element to add. + * @param controller + * the source flow controller. + */ + public void add(E elem, ISourceController sourceController) { + boolean ok = false; + synchronized (mutex) { + // If we don't have an fc sink, then just increment the limiter. + if (controllable == null) { + limiter.add(elem); + return; + } + if (okToAdd(elem)) { + ok = true; + if (limiter.add(elem)) { + blockSource(sourceController); + setUnThrottleListener(); + } + } else { + // Add to overflow queue and block source: + overflowQueue.add(elem); + blockSource(sourceController); + setUnThrottleListener(); + } + } + if (ok) { + controllable.flowElemAccepted(this, elem); + } + } + + /** + * Offers an element to the sink associated with this resource if space is + * available. If no space is available false is returned. The element does + * not get added to the overflow list. + * + * @param elem + * The element to add. + * @param controller + * the source flow controller. + */ + public boolean offer(E elem, ISourceController sourceController) { + synchronized (mutex) { + // If we don't have an fc sink, then just increment the limiter. + if (controllable == null) { + limiter.add(elem); + return true; + } + + if (okToAdd(elem)) { + if (limiter.add(elem)) { + blockSource(sourceController); + setUnThrottleListener(); + } + controllable.flowElemAccepted(this, elem); + return true; + } else { + blockSource(sourceController); + setUnThrottleListener(); + return false; + } + } + } + + private boolean okToAdd(E elem) { + return !useOverFlowQueue || limiter.canAdd(elem); + } + + private void addToResource(E elem) { + if (limiter != null) + limiter.add(elem); + if (controllable != null) + controllable.flowElemAccepted(this, elem); + } + + private void setUnThrottleListener() { + if (!throttleReg) { + throttleReg = true; + limiter.addUnThrottleListener(unthrottleListener); + } + } + + public void onUnthrottled() { + synchronized (mutex) { + throttleReg = false; + dispatchOverflowQueue(); + } + } + + /** + * Blocks a source. + * + * @param source + * The {@link ISinkController} of the source to be blocked. + */ + protected void blockSource(final ISourceController source) { + if (source == null) { + return; + } + + // If we are currently in the process of resuming we + // must wait for resume to complete, before we add to + // the blocked list: + waitForResume(); + + if (!blockedSources.contains(source)) { + // System.out.println("BLOCKING : SINK["+this + "], SOURCE[" + + // source+"]"); + blockedSources.add(source); + source.onFlowBlock(this); + } + } + + private void dispatchOverflowQueue() { + + // Dispatch elements on the blocked list into the limited resource + while (!overflowQueue.isEmpty()) { + E elem = overflowQueue.getFirst(); + if (limiter.canAdd(elem)) { + overflowQueue.removeFirst(); + addToResource(elem); + } else { + break; + } + } + + // See if we can now unblock the sources. + checkUnblockSources(); + + // If we've exceeded the the throttle threshold, register + // a listener so we can resume the blocked sources after + // the limiter falls below the threshold: + if (!overflowQueue.isEmpty()) { + setUnThrottleListener(); + } else if (notifyUnblock) { + mutex.notifyAll(); + } + } + + /** + * Called to wait for a flow to become unblocked. + * + * @param listener + * The listener. + * @return true; + */ + public final boolean addUnblockListener(FlowUnblockListener listener) { + synchronized (mutex) { + waitForResume(); + if (limiter.getThrottled() || !overflowQueue.isEmpty()) { + unblockListeners.add(listener); + return true; + } + } + return false; + } + + private final void waitForResume() { + boolean interrupted = false; + while (resuming) { + try { + mutex.wait(); + } catch (InterruptedException e) { + interrupted = true; + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + + /** + * Releases blocked sources providing the limiter isn't throttled, and there + * are no elements on the blocked list. + */ + private void checkUnblockSources() { + if (!resumeScheduled && !limiter.getThrottled() && overflowQueue.isEmpty() && (!blockedSources.isEmpty() || !unblockListeners.isEmpty())) { + resumeScheduled = true; + Runnable resume = new Runnable() { + public void run() { + synchronized (mutex) { + resuming = true; + } + String was = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(name); + for (ISourceController source : blockedSources) { + // System.out.println("UNBLOCKING: SINK["+FlowController.this + // + "], SOURCE[" + source+"]"); + source.onFlowResume(FlowController.this); + } + for (FlowUnblockListener listener : unblockListeners) { + // System.out.println(this + "Unblocking source " + + // source ); + listener.onFlowUnblocked(FlowController.this); + } + + } finally { + synchronized (mutex) { + blockedSources.clear(); + unblockListeners.clear(); + resuming = false; + resumeScheduled = false; + mutex.notifyAll(); + } + Thread.currentThread().setName(was); + } + } + }; + + RESUME_SERVICE.execute(resume); + } + } + + private static Executor RESUME_SERVICE = Executors.newCachedThreadPool(); + + public static final void setFlowExecutor(Executor executor) { + RESUME_SERVICE = executor; + } + + public String toString() { + return name; + } + + public IFlowSink getFlowSink() { + return controllable.getFlowSink(); + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +/** + * Defines an interface for draining a flow source. + * + * @param + */ +public interface IFlowDrain { + + /** + * Used by a FlowSource that is being dispatched to drain it's elements. + * The implementor is responsible for calling {@link ISourceController#elementDispatched(Object) + * when the element has been dispatched to all downstream sinks if IFlowSource + * @param elem + * @param controller + */ + public void drain(E elem, ISourceController controller); +}