Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 25166 invoked from network); 11 Feb 2009 20:13:00 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Feb 2009 20:13:00 -0000 Received: (qmail 27912 invoked by uid 500); 11 Feb 2009 20:13:00 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 27853 invoked by uid 500); 11 Feb 2009 20:13:00 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 27844 invoked by uid 99); 11 Feb 2009 20:13:00 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2009 12:12:59 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2009 20:12:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9A0882388B03; Wed, 11 Feb 2009 20:12:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r743476 [2/4] - in /activemq/sandbox/activemq-flow: ./ src/ src/main/ src/main/java/ src/main/java/com/ src/main/java/com/progress/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/act... Date: Wed, 11 Feb 2009 20:12:30 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090211201232.9A0882388B03@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +public interface IFlowLimiter { + public interface UnThrottleListener { + // Called when a throttled limiter falls below the throttle + // threshold. + public void onUnthrottled(); + } + + public void reserve(E elem); + + public void releaseReserved(); + + public boolean canAdd(E elem); + + /** + * Adds an element to the limiter, returning true if this results in the + * limiter being throttled. + * + * @param elem + * The element to add. + * @return True if this triggered throttling. + */ + public boolean add(E elem); + + public void remove(E elem); + + public boolean getThrottled(); + + public void addUnThrottleListener(UnThrottleListener listener); +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +import java.util.concurrent.atomic.AtomicLong; + +public interface IFlowResource { + + /** + * A listener for when new flows are opened and closed for this resource. + */ + public interface FlowLifeCycleListener { + public void onFlowOpened(IFlowResource source, Flow flow); + + public void onFlowClosed(IFlowResource source, Flow flow); + } + + /** + * Adds a {@link FlowLifeCycleListener} for flows opened by the source. + * + * @param listener + * The listener. + */ + public void addFlowLifeCycleListener(FlowLifeCycleListener listener); + + /** + * Called to remove a previously added {@link FlowLifeCycleListener}. + * + * @param listener + * The listener. + */ + public void removeFlowLifeCycleListener(FlowLifeCycleListener listener); + + static final public AtomicLong RESOURCE_COUNTER = new AtomicLong(); + + /** + * Gets the unique resource id associated with this resource + * + * @retrun the unique resource id. + */ + public long getResourceId(); + + public String getResourceName(); +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +public interface IFlowSink extends IFlowResource { + /** + * Adds an element to the sink. If limiter space in the sink is overflowed + * by the element then it will block the source controller. + * + * @param elem + * The element to add to the sink. + * @param source + * The source's flow controller. + */ + public void add(E elem, ISourceController source); + + /** + * Offers an element to the sink. If there is no room available the source's + * controller will be blocked and the element will not be added. + * + * @param elem + * The element to offer + * @param source + * The source's controller. + * @return false if the element wasn't accepted. + */ + public boolean offer(E elem, ISourceController source); +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +/** + * + */ +public interface IFlowSource extends IFlowResource { + + /** + * Gets the {@link ISourceController} for the specified flow. + * + * @param flow + * The flow. + * @return The flow controller for the specified flow. + */ + public FlowController getFlowController(Flow flow); + + /** + * If set to true the source will automatically release limiter space + * associated with {@link IFlowElem}s as they are dispacthed. If set to + * false then the {@link IFlowDrain} must release space via a call to + * {@link ISourceController#elementDispatched(IFlowElem)}. + * + * @param autoRelease + * If the source should release limiter space for elements. + */ + public void setAutoRelease(boolean autoRelease); + +/** + * Returns whether or not this {@link IFlowSource} is set to automatically + * release elements via {@link FlowController#elementDispatched(Object) during + * dispatch. When auto release is set the caller must not call + * {@link FlowController#elementDispatched(Object). + * + * @return true if auto release is set, false otherwise. + */ + public boolean getAutoRelease(); + + /** + * Sets the default drain for elements from this flow source. It will be + * invoked to dispatch elements from the source. + * + * @param drain + * The drain. + */ + public void setDrain(IFlowDrain drain); + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +public interface ISinkController { + /** + * Defines required attributes for an entity that can be flow controlled. + * + * @param + */ + public interface FlowControllable { + public void flowElemAccepted(ISourceController controller, E elem); + + public IFlowSink getFlowSink(); + + public IFlowSource getFlowSource(); + } + + /** + * Used to get a notification when a blocked controller becomes unblocked + * + * @param + */ + public interface FlowUnblockListener { + public void onFlowUnblocked(ISinkController controller); + } + + /** + * Offers an element to the sink associated with this resource if space is + * available. If no space is available false is returned. The element does + * not get added to the overflow list. + * + * @param elem + * The element to add. + * @param controller + * the source flow controller. + */ + public boolean offer(E elem, ISourceController sourceController); + + /** + * Adds an element to the sink associated with this resource if space is + * available. If no space is available the source controller will be + * blocked, and the source is responsible for tracking the space until this + * controller resumes. + * + * @param elem + * The element to add. + * @param controller + * the source flow controller. + */ + public void add(E elem, ISourceController controller); + + /** + * Called to check if this FlowController is currently being blocked + * + * @return True if the flow is blocked. + */ + public boolean isSinkBlocked(); + + /** + * Waits for a flow to become unblocked. + * + * @param flow + * The flow. + * @throws InterruptedException + * If interrupted while waiting. + */ + public void waitForFlowUnblock() throws InterruptedException; + + /** + * Sets a callback for the listener if this controller is currently blocked. + * + * @param listener + * The listener. + * @return True if a listener was registered false otherwise. + */ + public boolean addUnblockListener(FlowUnblockListener listener); + + public IFlowSink getFlowSink(); + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +/** + * The control interface to a source. Sinks typically call back to a + * ISourceController to suspend or resumed the dispatching of messages. + * + * @param + */ +public interface ISourceController { + + /** + * Returns the source that this FlowController is controlling. + * + * @return The source that the flow controller is controlling. + */ + public IFlowSource getFlowSource(); + + /** + * Gets the flow that this controller is controlling. + * + * @return + */ + public Flow getFlow(); + + /** + * This is called when a particular flow is blocked for a resource + * + * @param sink + * The sink blocking this source + */ + public void onFlowBlock(ISinkController sink); + + /** + * Callback used with FlowControllers to get a notification that an + * IFlowController has been resumed. + * + * @param controller + * The IFlowController that was unblocked. + */ + public void onFlowResume(ISinkController sink); + + public boolean isSourceBlocked(); + + /** + * Must be called once the elements have been sent to downstream sinks. + * + * @param elem + * The dispatched element. + * @return + */ + public void elementDispatched(E elem); + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +public class NoOpFlowController implements ISinkController { + private final IFlowSource source; + private final Flow flow; + + public NoOpFlowController(IFlowSource source, Flow flow) { + this.source = source; + this.flow = flow; + } + + public IFlowSource getFlowSource() { + return source; + } + + public Flow getFlow() { + // TODO Auto-generated method stub + return flow; + } + + public boolean isSinkBlocked() { + return false; + } + + public void onFlowBlock(IFlowSink sink) { + // Noop + } + + public void onFlowResume(IFlowSink sink) { + // Noop + } + + /** + * Must be called once the elements have been sent to downstream sinks. + * + * @param elem + * The dispatched element. + * @return + */ + public void elementDispatched(E elem) { + // No op for basic flow controller + } + + public String toString() { + return "DISABLED Flow Controller for: " + source; + } + + public boolean offer(E elem, ISourceController sourceController) { + throw new UnsupportedOperationException(); + } + + public void add(E elem, ISourceController controller) { + throw new UnsupportedOperationException(); + } + + public void waitForFlowUnblock() throws InterruptedException { + // TODO Auto-generated method stub + + } + + /** + * Sets a callback for the listener if this controller is currently blocked. + * + * @param listener + * The listener. + * @return True if a listener was registered false otherwise. + */ + public boolean addUnblockListener(FlowUnblockListener listener) { + return false; + } + + public IFlowSink getFlowSink() { + return null; + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +import org.apache.activemq.queue.Mapper; + +public class PriorityFlowController implements ISourceController, ISinkController { + + private final Object mutex; + private final FlowController controllers[]; + private final PrioritySizeLimiter limiter; + + private Mapper priorityMapper; + + private final Flow flow; + private final FlowControllable controllable; + + public PriorityFlowController(int priorities, FlowControllable controllable, Flow flow, Object mutex, int capacity, int resume) { + this.controllable = controllable; + this.flow = flow; + this.mutex = mutex; + this.limiter = new PrioritySizeLimiter(capacity, resume, priorities); + this.limiter.setPriorityMapper(priorityMapper); + this.controllers = createControlerArray(priorities); + for (int i = 0; i < priorities; i++) { + this.controllers[i] = new FlowController(controllable, flow, limiter.getPriorityLimter(i), mutex); + } + } + + @SuppressWarnings("unchecked") + private FlowController[] createControlerArray(int priorities) { + return new FlowController[priorities]; + } + + // ///////////////////////////////////////////////////////////////// + // ISinkController interface impl. + // ///////////////////////////////////////////////////////////////// + + public boolean offer(E elem, ISourceController controller) { + int prio = priorityMapper.map(elem); + return controllers[prio].offer(elem, controller); + } + + public void add(E elem, ISourceController controller) { + int prio = priorityMapper.map(elem); + controllers[prio].add(elem, controller); + } + + public boolean isSinkBlocked() { + synchronized (mutex) { + return limiter.getThrottled(); + } + } + + public boolean addUnblockListener(org.apache.activemq.flow.ISinkController.FlowUnblockListener listener) { + boolean rc = false; + for (int i = 0; i < controllers.length; i++) { + rc |= this.controllers[i].addUnblockListener(listener); + } + return rc; + } + + public void waitForFlowUnblock() throws InterruptedException { + throw new UnsupportedOperationException(); + } + + // ///////////////////////////////////////////////////////////////// + // ISourceController interface impl. + // ///////////////////////////////////////////////////////////////// + + public void elementDispatched(E elem) { + FlowController controler = controllers[priorityMapper.map(elem)]; + controler.elementDispatched(elem); + } + + public Flow getFlow() { + return flow; + } + + public IFlowSource getFlowSource() { + return controllable.getFlowSource(); + } + + public void onFlowBlock(ISinkController sink) { + for (int i = 0; i < controllers.length; i++) { + controllers[i].onFlowBlock(sink); + } + } + + public void onFlowResume(ISinkController sink) { + for (int i = 0; i < controllers.length; i++) { + controllers[i].onFlowBlock(sink); + } + } + + public boolean isSourceBlocked() { + return false; + } + + // ///////////////////////////////////////////////////////////////// + // Getters and Setters + // ///////////////////////////////////////////////////////////////// + + public Mapper getPriorityMapper() { + return priorityMapper; + } + + public void setPriorityMapper(Mapper priorityMapper) { + this.priorityMapper = priorityMapper; + limiter.setPriorityMapper(priorityMapper); + } + + public IFlowSink getFlowSink() { + return controllable.getFlowSink(); + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +import java.util.ArrayList; + +import org.apache.activemq.queue.Mapper; + +public class PrioritySizeLimiter { + + final private ArrayList priorities = new ArrayList(); + final protected int capacity; + final protected int resumeThreshold; + + private int totalSize; + private int throttledCount = 0; + private int highestPriority; + + private Mapper sizeMapper = new Mapper() { + public Integer map(E element) { + return 1; + } + }; + + private Mapper priorityMapper = sizeMapper; + + private class Priority extends AbstractLimiter { + final int priority; + int size; + int reserved; + private boolean throttled; + + public Priority(int priority) { + this.priority = priority; + } + + public boolean add(E elem) { + int elementSize = sizeMapper.map(elem); + totalSize += elementSize; + size += elementSize; + if (totalSize >= capacity) { + if (!throttled) { + throttled = true; + throttledCount++; + } + } + if (priority >= highestPriority) { + highestPriority = priority; + } + return throttled; + } + + public boolean canAdd(E elem) { + if (throttled) + return false; + + int prio = priorityMapper.map(elem); + if (prio < highestPriority) { + if (!throttled) { + throttled = true; + throttledCount++; + } + return false; + } + + return true; + } + + public boolean getThrottled() { + return throttled; + } + + public void releaseReserved() { + if (reserved > 0) { + int res = reserved; + reserved = 0; + remove(res); + } + } + + public void remove(E elem) { + int size = sizeMapper.map(elem); + remove(size); + } + + protected void remove(int s) { + size -= s; + totalSize -= s; + + assert size >= 0 : "Negative limiter size: " + size; + assert totalSize >= 0 : "Negative limiter total size:" + totalSize; + + if (totalSize <= resumeThreshold) { + priorities.get(highestPriority).unThrottle(); + } + } + + public void unThrottle() { + if (throttled) { + throttled = false; + throttledCount--; + notifyUnThrottleListeners(); + + // Has the highest priority level emptied out? + if (size == 0 && priority == highestPriority) { + // Set highestPriority to the new highest priority level + highestPriority = 0; + for (int i = priority - 1; i >= 0; i--) { + Priority p = priorities.get(i); + if (p.size > 0 || p.throttled) { + highestPriority = i; + if (totalSize <= resumeThreshold) { + p.unThrottle(); + } + } + } + } + } + } + + public void reserve(E elem) { + reserved += sizeMapper.map(elem); + } + } + + public PrioritySizeLimiter(int capacity, int resumeThreshold, int priorities) { + this.capacity = capacity; + this.resumeThreshold = resumeThreshold; + for (int i = 0; i < priorities; i++) { + this.priorities.add(new Priority(i)); + } + } + + public IFlowLimiter getPriorityLimter(int priority) { + return priorities.get(priority); + } + + public Mapper getSizeMapper() { + return sizeMapper; + } + + public void setSizeMapper(Mapper sizeMapper) { + this.sizeMapper = sizeMapper; + } + + public Mapper getPriorityMapper() { + return priorityMapper; + } + + public void setPriorityMapper(Mapper priorityMapper) { + this.priorityMapper = priorityMapper; + } + + public boolean getThrottled() { + return throttledCount > 0; + } + + public int getPriorities() { + return priorities.size(); + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.flow; + +public class SizeLimiter extends AbstractLimiter { + + protected int capacity; + protected int resumeThreshold; + + private int size; + private boolean throttled; + private int reserved; + + public SizeLimiter(int capacity, int resumeThreshold) { + this.capacity = capacity; + throttled = false; + this.resumeThreshold = resumeThreshold; + } + + public final boolean add(E elem) { + this.size += getElementSize(elem); + + if (this.size >= capacity) { + throttled = true; + } + return throttled; + } + + public final void remove(E elem) { + remove(getElementSize(elem)); + } + + public void reserve(E elem) { + reserved += getElementSize(elem); + } + + public void releaseReserved() { + if (reserved > 0) { + int res = reserved; + reserved = 0; + remove(res); + } + } + + protected void remove(int s) { + this.size -= s; + if (size < 0) { + Exception ie = new IllegalStateException("Size Negative!" + size); + ie.printStackTrace(); + } + + if (throttled && this.size <= resumeThreshold) { + throttled = false; + notifyUnThrottleListeners(); + } + } + + public final void reset() { + size = 0; + notifyUnThrottleListeners(); + } + + /** + * Subclasses should override to return the size of an element + * + * @param elem + */ + public int getElementSize(E elem) { + return 1; + } + + public final boolean getThrottled() { + return throttled; + } + + public boolean canAdd(E elem) { + return !throttled; + } + + public int getCapacity() { + return capacity; + } + + public int getResumeThreshold() { + return resumeThreshold; + } + + public int getSize() { + return size; + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.metric; + +abstract public class Metric { + + private String name; + private String unit = "items"; + + public Metric name(String name) { + this.name = name; + return this; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public abstract long counter(); + + public Metric unit(String unit) { + this.unit = unit; + return this; + } + + public String getUnit() { + return unit; + } + + public void setUnit(String unit) { + this.unit = unit; + } + + public String getRateSummary(Period period) { + return String.format("%s: %(,.2f %s/s", name, period.rate(counter()), unit); + } + + abstract public void reset(); + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.metric; + +import java.util.ArrayList; + +public class MetricAggregator extends Metric { + + ArrayList metrics = new ArrayList(); + + public MetricAggregator name(String name) { + return (MetricAggregator) super.name(name); + } + + public MetricAggregator unit(String unit) { + return (MetricAggregator) super.unit(unit); + } + + public void add(Metric metric) { + metrics.add(metric); + if (getUnit() != null) { + metric.setUnit(getUnit()); + } + } + + public boolean remove(Metric metric) { + return metrics.remove(metric); + } + + public Float average() { + if (metrics.isEmpty()) { + return null; + } + long rc = 0; + int count = 0; + for (Metric metric : metrics) { + rc += metric.counter(); + count++; + } + return rc * 1.0f / count; + } + + public long total() { + long rc = 0; + for (Metric metric : metrics) { + rc += metric.counter(); + } + return rc; + } + + public Long min() { + if (metrics.isEmpty()) { + return null; + } + long rc = Long.MAX_VALUE; + for (Metric metric : metrics) { + long t = metric.counter(); + if (t < rc) { + rc = t; + } + } + return rc; + } + + public Long max() { + if (metrics.isEmpty()) { + return null; + } + long rc = Long.MIN_VALUE; + for (Metric metric : metrics) { + long t = metric.counter(); + if (t > rc) { + rc = t; + } + } + return rc; + } + + @Override + public long counter() { + return total(); + } + + public String getRateSummary(Period period) { + return String + .format("%s: total=%(,.2f, avg=%(,.2f, min=%(,.2f, max=%(,.2f in %s/s", getName(), period.rate(total()), period.rate(average()), period.rate(min()), period.rate(max()), getUnit()); + } + + public String getChildRateSummary(Period period) { + StringBuilder rc = new StringBuilder(); + rc.append("{\n"); + for (Metric metric : metrics) { + rc.append(" "); + rc.append(metric.getRateSummary(period)); + rc.append("\n"); + } + rc.append("}"); + return rc.toString(); + } + + @Override + public void reset() { + for (Metric metric : metrics) { + metric.reset(); + } + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.metric; + +import java.util.concurrent.atomic.AtomicLong; + +public class MetricCounter extends Metric { + + AtomicLong counter = new AtomicLong(); + + public MetricCounter name(String name) { + return (MetricCounter) super.name(name); + } + + public final long increment(long delta) { + return counter.addAndGet(delta); + } + + public final long increment() { + return counter.incrementAndGet(); + } + + @Override + public final long counter() { + return counter.get(); + } + + @Override + public void reset() { + counter.set(0); + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.metric; + +public class Period { + + long start = System.currentTimeMillis(); + long end; + + public long getStart() { + return start; + } + + public void setStart(long start) { + this.start = start; + } + + public long getEnd() { + if (end == 0) { + end = System.currentTimeMillis(); + } + return end; + } + + public void setEnd(long end) { + this.end = end; + } + + public void reset() { + start = System.currentTimeMillis(); + end = 0; + } + + public long duration() { + return getEnd() - getStart(); + } + + public Float rate(Long counter) { + if (counter == null) { + return null; + } + return ((counter * 1000f) / duration()); + } + + public Float rate(Float counter) { + if (counter == null) { + return null; + } + return ((counter * 1000f) / duration()); + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.activemq.dispatch.IDispatcher; +import org.apache.activemq.dispatch.IDispatcher.DispatchContext; +import org.apache.activemq.dispatch.IDispatcher.Dispatchable; +import org.apache.activemq.flow.AbstractLimitedFlowSource; +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.ISinkController.FlowControllable; + +/** + * Base class for a {@link Dispatchable} {@link FlowControllable} + * {@link IFlowQueue}. + * + * @param + */ +public abstract class AbstractFlowQueue extends AbstractLimitedFlowSource implements FlowControllable, IFlowQueue, Dispatchable { + + protected IDispatcher dispatcher; + protected DispatchContext dispatchContext; + protected final Collection> readyListeners = new ArrayList>(); + private boolean notifyReady = false; + protected boolean dispatching = false; + protected int dispatchPriority = 0; + + AbstractFlowQueue() { + super(); + } + + protected AbstractFlowQueue(String name) { + super(name); + } + + public final boolean dispatch() { + + while (pollingDispatch()) + ; + + return true; + + // return !pollingDispatch(); + } + + public final IFlowSink getFlowSink() { + // TODO Auto-generated method stub + return this; + } + + public final IFlowSource getFlowSource() { + // TODO Auto-generated method stub + return this; + } + + protected final FlowControllable getFlowControllableHook() { + return this; + } + + /** + * Sets an asynchronous dispatcher for this source. As elements become + * available they will be dispatched to the worker pool. + * + * @param workers + * The executor thread pool. + * @param dispatcher + * The dispatcher to handle messages. + */ + public synchronized void setDispatcher(IDispatcher dispatcher) { + this.dispatcher = dispatcher; + dispatchContext = dispatcher.register(this, getResourceName()); + dispatchContext.updatePriority(dispatchPriority); + } + + public synchronized final void setDispatchPriority(int priority) { + dispatchPriority = priority; + if (dispatchContext != null) { + dispatchContext.updatePriority(priority); + } + } + + public synchronized void addFlowReadyListener(IPollableFlowSource.FlowReadyListener watcher) { + + readyListeners.add(watcher); + if (isDispatchReady()) { + notifyReady(); + } + } + + /** + * Dispatches an element potentialy blocking until an element is available + * for dispatch. + */ + public final void blockingDispatch() throws InterruptedException { + + while (!pollingDispatch()) { + waitForDispatchReady(); + } + } + + /** + * Indicates that there are elements ready for dispatch. + */ + protected void notifyReady() { + if (dispatchContext != null) { + dispatchContext.requestDispatch(); + return; + } + + synchronized (this) { + if (dispatchContext != null) { + if (!dispatching) { + dispatching = true; + dispatchContext.requestDispatch(); + } + return; + } + + if (notifyReady) { + notify(); + } + + if (!readyListeners.isEmpty()) { + for (FlowReadyListener listener : readyListeners) { + listener.onFlowReady(this); + } + } + + readyListeners.clear(); + } + } + + protected synchronized void waitForDispatchReady() throws InterruptedException { + while (!isDispatchReady()) { + notifyReady = true; + wait(); + } + notifyReady = false; + } + + public String toString() { + return getResourceName(); + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +import org.apache.activemq.dispatch.PriorityLinkedList; +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.FlowController; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.flow.PriorityFlowController; +import org.apache.kahadb.util.LinkedNode; + +/** + */ +public class ExclusivePriorityQueue extends AbstractFlowQueue implements IFlowQueue { + + private final PriorityLinkedList queue; + private Mapper priorityMapper; + + private class PriorityNode extends LinkedNode { + E elem; + int prio; + } + + private final PriorityFlowController controller; + + /** + * Creates a flow queue that can handle multiple flows. + * + * @param priority + * @param flow + * The {@link Flow} + * @param capacity + * @param resume + * @param controller + * The FlowController if this queue is flow controlled: + */ + public ExclusivePriorityQueue(int priority, Flow flow, String name, int capacity, int resume) { + super(name); + this.queue = new PriorityLinkedList(10); + this.controller = new PriorityFlowController(priority, getFlowControllableHook(), flow, this, capacity, resume); + + } + + public boolean offer(E elem, ISourceController source) { + return controller.offer(elem, source); + } + + /** + * Performs a limited add to the queue. + */ + public final void add(E elem, ISourceController source) { + controller.add(elem, source); + } + + /** + * Called when the controller accepts a message for this queue. + */ + public synchronized void flowElemAccepted(ISourceController controller, E elem) { + PriorityNode node = new PriorityNode(); + node.elem = elem; + node.prio = priorityMapper.map(elem); + + queue.add(node, node.prio); + notifyReady(); + } + + public FlowController getFlowController(Flow flow) { + // TODO: + return null; + } + + public boolean isDispatchReady() { + return !queue.isEmpty(); + } + + public boolean pollingDispatch() { + PriorityNode node = null; + synchronized (this) { + node = queue.poll(); + // FIXME the release should really be done after dispatch. + // doing it here saves us from having to resynchronize + // after dispatch, but release limiter space too soon. + if (autoRelease && node != null) { + controller.elementDispatched(node.elem); + } + } + + if (node != null) { + drain.drain(node.elem, controller); + return true; + } else { + return false; + } + } + + public Mapper getPriorityMapper() { + return priorityMapper; + } + + public void setPriorityMapper(Mapper priorityMapper) { + this.priorityMapper = priorityMapper; + controller.setPriorityMapper(priorityMapper); + } + + @Override + public String toString() { + return getResourceName(); + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +import java.util.LinkedList; + +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.FlowController; +import org.apache.activemq.flow.IFlowLimiter; +import org.apache.activemq.flow.ISourceController; + +public class ExclusiveQueue extends AbstractFlowQueue { + private final LinkedList queue = new LinkedList(); + private final FlowController controller; + + /** + * Creates a flow queue that can handle multiple flows. + * + * @param flow + * The {@link Flow} + * @param controller + * The FlowController if this queue is flow controlled: + */ + public ExclusiveQueue(Flow flow, String name, IFlowLimiter limiter) { + super(name); + this.controller = new FlowController(getFlowControllableHook(), flow, limiter, this); + super.onFlowOpened(controller); + } + + public boolean offer(E elem, ISourceController source) { + return controller.offer(elem, source); + } + + /** + * Performs a limited add to the queue. + */ + public final void add(E elem, ISourceController source) { + controller.add(elem, source); + } + + /** + * Called when the controller accepts a message for this queue. + */ + public synchronized void flowElemAccepted(ISourceController controller, E elem) { + queue.add(elem); + notifyReady(); + } + + public FlowController getFlowController(Flow flow) { + return controller; + } + + public final boolean isDispatchReady() { + return !queue.isEmpty(); + } + + public final boolean pollingDispatch() { + E elem = null; + synchronized (this) { + elem = queue.poll(); + // FIXME the release should really be done after dispatch. + // doing it here saves us from having to resynchronize + // after dispatch, but release limiter space too soon. + if (autoRelease && elem != null) { + controller.elementDispatched(elem); + } + } + + if (elem != null) { + drain.drain(elem, controller); + return true; + } else { + return false; + } + } + + @Override + public String toString() { + return "SingleFlowQueue:" + getResourceName(); + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +import org.apache.activemq.dispatch.IDispatcher; +import org.apache.activemq.flow.IFlowSource; + +public interface IAsynchronousFlowSource extends IFlowSource { + + /** + * Sets an asynchronous dispatcher for this source. As elements become + * available they will be dispatched to the worker pool. + * + * @param workers + * The executor thread pool. + * @param dispatcher + * The dispatcher to handle messages. + * @param controller + * The controller for the flow to process in the worker pool. + */ + public void setDispatcher(IDispatcher workers); + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +import org.apache.activemq.flow.IFlowSource; + +public interface IBlockingFlowSource extends IFlowSource { + + /** + * Dispatches the next available element to the source's dispatcher, + * blocking until an element is available. + * + * @throws InterruptedException + */ + public void blockingDispatch() throws InterruptedException; +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +import org.apache.activemq.flow.IFlowSink; + +public interface IFlowQueue extends IBlockingFlowSource, IPollableFlowSource, IAsynchronousFlowSource, IFlowSink { + + public void setDispatchPriority(int priority); +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +import org.apache.activemq.flow.IFlowSource; + +public interface IPollableFlowSource extends IFlowSource { + + /** + * Callback used to indicate that a PollableFlowSource is ready for + * dispatch. + * + * @param + */ + public interface FlowReadyListener { + public void onFlowReady(IPollableFlowSource source); + } + + /** + * Sets a listener to indicate when there are elements available for + * dispatch from this source. + * + * @param listener + * The listener. + */ + public void addFlowReadyListener(FlowReadyListener listener); + + /** + * Dispatches the next available element returning false if there were no + * elements available for dispatch. + * + * @return False if there were no elements to dispatch. + */ + public boolean pollingDispatch(); + + /** + * @return True if there are elements ready to dispatch. + */ + public boolean isDispatchReady(); + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +import org.apache.activemq.flow.IFlowSink; + +public interface IQueue extends IFlowSink { + + public void addSubscription(Subscription sub); + + public boolean removeSubscription(Subscription sub); + + public boolean removeByValue(V value); + + public boolean removeByKey(K key); + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +import java.util.HashMap; +import java.util.LinkedList; + +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.FlowController; +import org.apache.activemq.flow.IFlowLimiter; +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.ISinkController; +import org.apache.activemq.flow.ISourceController; +import org.apache.kahadb.util.LinkedNode; +import org.apache.kahadb.util.LinkedNodeList; + +/** + */ +public class LoadBalancedFlowQueue extends AbstractFlowQueue { + private final LinkedList queue = new LinkedList(); + private final LinkedNodeList readyConsumers = new LinkedNodeList(); + private final HashMap, SinkNode> consumers = new HashMap, SinkNode>(); + + private boolean strcitDispatch = true; + + private final FlowController sinkController; + + private final ISourceController sourceControler = new ISourceController() { + + public Flow getFlow() { + return sinkController.getFlow(); + } + + public IFlowSource getFlowSource() { + return LoadBalancedFlowQueue.this; + } + + public void onFlowBlock(ISinkController sink) { + synchronized (LoadBalancedFlowQueue.this) { + SinkNode node = consumers.get(sink); + if (node != null) { + node.unlink(); + } + // controller.onFlowBlock(sink); + } + + } + + public void onFlowResume(ISinkController sink) { + synchronized (LoadBalancedFlowQueue.this) { + SinkNode node = consumers.get(sink); + if (node != null) { + // controller.onFlowResume(sink); + // Add to ready list if not there: + if (!node.isLinked()) { + boolean notify = false; + if (readyConsumers.isEmpty()) { + notify = true; + } + + readyConsumers.addLast(node); + if (notify && !queue.isEmpty()) { + notifyReady(); + } + } + } + } + } + + public void elementDispatched(E elem) { + // TODO Auto-generated method stub + + } + + public boolean isSourceBlocked() { + // TODO Auto-generated method stub + return false; + } + + }; + + /** + * Creates a flow queue that can handle multiple flows. + * + * @param flow + * The {@link Flow} + * @param controller + * The FlowController if this queue is flow controlled: + */ + public LoadBalancedFlowQueue(Flow flow, String name, long resourceId, IFlowLimiter limiter) { + super(name); + this.sinkController = new FlowController(getFlowControllableHook(), flow, limiter, this); + super.onFlowOpened(sinkController); + } + + public boolean offer(E elem, ISourceController source) { + return sinkController.offer(elem, source); + } + + /** + * Performs a limited add to the queue. + */ + public final void add(E elem, ISourceController source) { + sinkController.add(elem, source); + } + + /** + * Called when the controller accepts a message for this queue. + */ + public synchronized void flowElemAccepted(ISourceController controller, E elem) { + queue.add(elem); + if (!readyConsumers.isEmpty()) { + notifyReady(); + } + } + + public FlowController getFlowController(Flow flow) { + return sinkController; + } + + public boolean isDispatchReady() { + return !queue.isEmpty() && !readyConsumers.isEmpty(); + } + + public boolean pollingDispatch() { + if (strcitDispatch) { + return strictPollingDispatch(); + } else { + return loosePollingDispatch(); + } + } + + private boolean strictPollingDispatch() { + + SinkNode node = null; + E elem = null; + synchronized (this) { + if (readyConsumers.isEmpty()) { + return false; + } + // Get the next elem: + elem = queue.peek(); + if (elem == null) { + return false; + } + + node = readyConsumers.getHead(); + } + + while (true) { + + boolean accepted = node.sink.offer(elem, sourceControler); + + synchronized (this) { + if (accepted) { + queue.remove(); + if (autoRelease) { + sinkController.elementDispatched(elem); + } + if (!readyConsumers.isEmpty()) { + readyConsumers.rotate(); + } + return true; + } else { + if (readyConsumers.isEmpty()) { + return false; + } + node = readyConsumers.getHead(); + } + } + } + } + + private boolean loosePollingDispatch() { + E elem = null; + IFlowSink sink = null; + synchronized (this) { + if (readyConsumers.isEmpty()) { + return false; + } + + // Get the next sink: + sink = readyConsumers.getHead().sink; + + // Get the next elem: + elem = queue.poll(); + if (elem == null) { + return false; + } + + readyConsumers.rotate(); + + // FIXME the release should really be done after dispatch. + // doing it here saves us from having to resynchronize + // after dispatch, but releases limiter space too soon. + if (autoRelease) { + sinkController.elementDispatched(elem); + } + + } + + sink.add(elem, sourceControler); + return true; + } + + public final void addSink(IFlowSink sink) { + synchronized (this) { + SinkNode node = consumers.get(sink); + if (node == null) { + node = new SinkNode(sink); + consumers.put(sink, node); + readyConsumers.addLast(node); + if (!queue.isEmpty()) { + notifyReady(); + } + } + } + } + + private class SinkNode extends LinkedNode { + public final IFlowSink sink; + + public SinkNode(IFlowSink sink) { + this.sink = sink; + } + + @Override + public String toString() { + return sink.toString(); + } + } + + public boolean isStrcitDispatch() { + return strcitDispatch; + } + + public void setStrcitDispatch(boolean strcitDispatch) { + this.strcitDispatch = strcitDispatch; + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +public interface Mapper { + K map(V element); +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.kahadb.util.LinkedNode; +import org.apache.kahadb.util.LinkedNodeList; + +public class MemoryStore implements Store { + + AtomicLong counter = new AtomicLong(); + + class MemoryStoreNode extends LinkedNode implements StoreNode { + private Subscription owner; + private final K key; + private final V value; + private long id = counter.getAndIncrement(); + + public MemoryStoreNode(K key, V value) { + this.key = key; + this.value = value; + } + + public boolean acquire(Subscription owner) { + if (this.owner == null) { + this.owner = owner; + } + return true; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + + @Override + public String toString() { + return "node:" + id; + } + + public void unacquire() { + this.owner = null; + } + + } + + class MemoryStoreCursor implements StoreCursor { + private MemoryStoreNode last; + private MemoryStoreNode next; + + public MemoryStoreCursor() { + } + + public MemoryStoreCursor(MemoryStoreNode next) { + this.next = next; + } + + public void setNext(StoreNode next) { + this.next = (MemoryStoreNode) next; + } + + public boolean hasNext() { + if (next != null) + return true; + + if (last == null || last.getNextCircular() == last) { + next = (MemoryStoreNode) elements.getHead(); + return next != null; + } + + while (true) { + MemoryStoreNode t = last.getNextCircular(); + if (t.id > last.id) { + next = t; + return true; + } else { + return false; + } + } + } + + public StoreNode peekNext() { + hasNext(); + return next; + } + + public StoreNode next() { + try { + hasNext(); + return next; + } finally { + last = next; + next = null; + } + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + } + + protected HashMap map = new HashMap(); + protected LinkedNodeList elements = new LinkedNodeList(); + + public StoreNode add(K key, V value) { + MemoryStoreNode rc = new MemoryStoreNode(key, value); + map.put(key, rc); + elements.addLast(rc); + return rc; + } + + public StoreNode remove(K key) { + MemoryStoreNode node = (MemoryStoreNode) map.remove(key); + if (node != null) { + node.unlink(); + } + return node; + } + + public boolean isEmpty() { + return elements.isEmpty(); + } + + public org.apache.activemq.queue.Store.StoreCursor openCursor() { + MemoryStoreCursor cursor = new MemoryStoreCursor(); + return cursor; + } + + public org.apache.activemq.queue.Store.StoreCursor openCursorAt(org.apache.activemq.queue.Store.StoreNode next) { + MemoryStoreCursor cursor = new MemoryStoreCursor((MemoryStoreNode) next); + return cursor; + } + + public int size() { + return map.size(); + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java?rev=743476&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java Wed Feb 11 20:12:28 2009 @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.queue; + +import java.util.HashMap; +import java.util.LinkedList; + +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.FlowController; +import org.apache.activemq.flow.IFlowLimiter; +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.flow.SizeLimiter; +import org.apache.kahadb.util.LinkedNode; +import org.apache.kahadb.util.LinkedNodeList; + +public class MultiFlowQueue extends AbstractFlowQueue { + private final HashMap flowQueues = new HashMap(); + private final LinkedNodeList readyQueues = new LinkedNodeList(); + + private final int perFlowWindow; + private final int resumeThreshold; + + public MultiFlowQueue(String name, int perFlowWindow, int resumeThreshold) { + super(name); + this.perFlowWindow = perFlowWindow; + this.resumeThreshold = resumeThreshold; + } + + public final void flowElemAccepted(ISourceController controller, E elem) { + // We don't currently create a flow controller for this, + // so this shouldn't be called. + throw new UnsupportedOperationException(); + } + + public boolean offer(E elem, ISourceController source) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public synchronized void add(E elem, ISourceController source) { + SingleFlowQueue queue = flowQueues.get(source.getFlow()); + if (queue == null) { + queue = new SingleFlowQueue(source.getFlow(), new SizeLimiter(perFlowWindow, resumeThreshold)); + flowQueues.put(source.getFlow(), queue); + super.onFlowOpened(queue.controller); + } + queue.enqueue(elem, source); + } + + public boolean pollingDispatch() { + SingleFlowQueue queue = null; + E elem = null; + synchronized (this) { + queue = peekReadyQueue(); + if (queue == null) { + return false; + } + + elem = queue.poll(); + if (elem == null) { + + unreadyQueue(queue); + return false; + } + + // rotate to have fair dispatch. + queue.getList().rotate(); + } + + drain.drain(elem, queue.controller); + return true; + } + + public final boolean isDispatchReady() { + return !readyQueues.isEmpty(); + } + + private SingleFlowQueue peekReadyQueue() { + if (readyQueues.isEmpty()) { + return null; + } + return readyQueues.getHead(); + } + + private void unreadyQueue(SingleFlowQueue node) { + node.unlink(); + } + + private void addReadyQueue(SingleFlowQueue node) { + readyQueues.addLast(node); + } + + /** + * Limits a flow that has potentially multiple sources. + */ + private class SingleFlowQueue extends LinkedNode implements FlowController.FlowControllable { + private final LinkedList queue = new LinkedList(); + final FlowController controller; + private boolean ready = false; + + SingleFlowQueue(Flow flow, IFlowLimiter limiter) { + this.controller = new FlowController(this, flow, limiter, MultiFlowQueue.this); + } + + final void enqueue(E elem, ISourceController source) { + controller.add(elem, source); + } + + public IFlowSource getFlowSource() { + return MultiFlowQueue.this; + } + + public IFlowSink getFlowSink() { + return MultiFlowQueue.this; + } + + public void flowElemAccepted(ISourceController controller, E elem) { + + synchronized (MultiFlowQueue.this) { + queue.add(elem); + if (!ready) { + addReadyQueue(this); + ready = true; + } + // Always request on new elements: + notifyReady(); + } + } + + private E poll() { + E e = queue.poll(); + if (e == null) { + ready = false; + } else if (autoRelease) { + controller.elementDispatched(e); + } + return e; + } + } + +}