activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r743476 [3/4] - in /activemq/sandbox/activemq-flow: ./ src/ src/main/ src/main/java/ src/main/java/com/ src/main/java/com/progress/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/act...
Date Wed, 11 Feb 2009 20:12:30 GMT
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.activemq.flow.AbstractLimitedFlowResource;
+import org.apache.activemq.flow.ISourceController;
+
+abstract public class PartitionedQueue<P, K, V> extends AbstractLimitedFlowResource<V> implements IQueue<K, V> {
+
+    private HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
+    private HashMap<P, IQueue<K, V>> partitions = new HashMap<P, IQueue<K, V>>();
+    private Mapper<P, V> partitionMapper;
+
+    public IQueue<K, V> getPartition(P partitionKey) {
+        synchronized (partitions) {
+            IQueue<K, V> rc = partitions.get(partitionKey);
+            if (rc == null) {
+                rc = cratePartition(partitionKey);
+                partitions.put(partitionKey, rc);
+                for (Subscription<V> sub : subscriptions) {
+                    rc.addSubscription(sub);
+                }
+            }
+            return rc;
+        }
+    }
+
+    abstract protected IQueue<K, V> cratePartition(P partitionKey);
+
+    public void addSubscription(Subscription<V> sub) {
+        synchronized (partitions) {
+            subscriptions.add(sub);
+            Collection<IQueue<K, V>> values = partitions.values();
+            for (IQueue<K, V> queue : values) {
+                queue.addSubscription(sub);
+            }
+        }
+    }
+
+    public boolean removeSubscription(Subscription<V> sub) {
+        synchronized (partitions) {
+            if (subscriptions.remove(sub)) {
+                Collection<IQueue<K, V>> values = partitions.values();
+                for (IQueue<K, V> queue : values) {
+                    queue.removeSubscription(sub);
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean removeByKey(K key) {
+        synchronized (partitions) {
+            Collection<IQueue<K, V>> values = partitions.values();
+            for (IQueue<K, V> queue : values) {
+                if (queue.removeByKey(key)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    public boolean removeByValue(V value) {
+        P partitionKey = partitionMapper.map(value);
+        IQueue<K, V> partition = getPartition(partitionKey);
+        return partition.removeByValue(value);
+    }
+
+    public void setPartitionMapper(Mapper<P, V> partitionMapper) {
+        this.partitionMapper = partitionMapper;
+    }
+
+    public Mapper<P, V> getPartitionMapper() {
+        return partitionMapper;
+    }
+
+    public void add(V value, ISourceController<V> source) {
+        P partitionKey = partitionMapper.map(value);
+        IQueue<K, V> partition = getPartition(partitionKey);
+        partition.add(value, source);
+    }
+
+    public boolean offer(V value, ISourceController<V> source) {
+        P partitionKey = partitionMapper.map(value);
+        IQueue<K, V> partition = getPartition(partitionKey);
+        return partition.offer(value, source);
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.AbstractLimitedFlowResource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.PrioritySizeLimiter;
+
+public class SharedPriorityQueue<K, V> extends AbstractLimitedFlowResource<V> implements IQueue<K, V> {
+
+    private final HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
+    private final Mapper<Integer, V> priorityMapper;
+    private final ArrayList<SharedQueue<K, V>> partitions = new ArrayList<SharedQueue<K, V>>();
+    private Mapper<K, V> keyMapper;
+    private boolean autoRelease;
+    private IDispatcher dispatcher;
+    private final PrioritySizeLimiter<V> limiter;
+
+    public SharedPriorityQueue(String name, PrioritySizeLimiter<V> limiter) {
+        super(name);
+        this.limiter = limiter;
+        priorityMapper = limiter.getPriorityMapper();
+        for (int i = 0; i < limiter.getPriorities(); i++) {
+            partitions.add(null);
+        }
+    }
+
+    public void setResourceName(String resourceName) {
+        super.setResourceName(resourceName);
+    }
+
+    public void addSubscription(Subscription<V> sub) {
+        synchronized (this) {
+            subscriptions.add(sub);
+            for (SharedQueue<K, V> queue : partitions) {
+                if (queue != null) {
+                    queue.addSubscription(sub);
+                }
+            }
+        }
+    }
+
+    public boolean removeSubscription(Subscription<V> sub) {
+        synchronized (this) {
+            if (subscriptions.remove(sub)) {
+                for (SharedQueue<K, V> queue : partitions) {
+                    if (queue != null) {
+                        queue.removeSubscription(sub);
+                    }
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean removeByKey(K key) {
+        synchronized (this) {
+            for (SharedQueue<K, V> queue : partitions) {
+                if (queue.removeByKey(key)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    public boolean removeByValue(V value) {
+        int prio = priorityMapper.map(value);
+        IQueue<K, V> partition = getPartition(prio);
+        return partition.removeByValue(value);
+    }
+
+    private IQueue<K, V> getPartition(int prio) {
+        synchronized (this) {
+            SharedQueue<K, V> queue = partitions.get(prio);
+            if (queue == null) {
+                queue = new SharedQueue<K, V>(getResourceName() + ":" + prio, limiter.getPriorityLimter(prio), this);
+                queue.setAutoRelease(autoRelease);
+                queue.setDispatcher(dispatcher);
+                queue.setDispatchPriority(prio);
+                queue.setKeyMapper(keyMapper);
+                partitions.set(prio, queue);
+                onFlowOpened(queue.getFlowControler());
+
+                for (Subscription<V> sub : subscriptions) {
+                    queue.addSubscription(sub);
+                }
+            }
+            return queue;
+        }
+    }
+
+    public void add(V value, ISourceController<V> source) {
+        int prio = priorityMapper.map(value);
+        IQueue<K, V> partition = getPartition(prio);
+        partition.add(value, source);
+    }
+
+    public boolean offer(V value, ISourceController<V> source) {
+        int prio = priorityMapper.map(value);
+        IQueue<K, V> partition = getPartition(prio);
+        return partition.offer(value, source);
+    }
+
+    public void setKeyMapper(Mapper<K, V> keyMapper) {
+        this.keyMapper = keyMapper;
+    }
+
+    public void setAutoRelease(boolean autoRelease) {
+        this.autoRelease = autoRelease;
+    }
+
+    public void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.Store.StoreCursor;
+import org.apache.activemq.queue.Store.StoreNode;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+/**
+ * IQueue which does direct dispatch whenever it can.
+ */
+public class SharedQueue<K, V> extends AbstractFlowQueue<V> implements IQueue<K, V> {
+
+    protected Store<K, V> store = new TreeMemoryStore<K, V>();
+
+    private final LinkedNodeList<SubscriptionNode> unreadyDirectSubs = new LinkedNodeList<SubscriptionNode>();
+    private final LinkedNodeList<SubscriptionNode> readyDirectSubs = new LinkedNodeList<SubscriptionNode>();
+
+    private final LinkedNodeList<SubscriptionNode> unreadyPollingSubs = new LinkedNodeList<SubscriptionNode>();
+    private final LinkedNodeList<SubscriptionNode> readyPollingSubs = new LinkedNodeList<SubscriptionNode>();
+
+    private final HashMap<Subscription<V>, SubscriptionNode> subscriptions = new HashMap<Subscription<V>, SubscriptionNode>();
+    private final HashMap<IFlowSink<V>, SubscriptionNode> sinks = new HashMap<IFlowSink<V>, SubscriptionNode>();
+
+    private final FlowController<V> sinkController;
+    private final Object mutex;
+    private final AbstractFlowQueue whoToWakeup = this;
+
+    protected Mapper<K, V> keyMapper;
+    private long directs;
+
+    private final ISourceController<V> sourceControler = new ISourceController<V>() {
+
+        public Flow getFlow() {
+            return sinkController.getFlow();
+        }
+
+        public void elementDispatched(V elem) {
+        }
+
+        public void onFlowBlock(ISinkController<V> sink) {
+        }
+
+        public void onFlowResume(ISinkController<V> sinkController) {
+            IFlowSink<V> sink = sinkController.getFlowSink();
+            synchronized (mutex) {
+                SubscriptionNode node = sinks.get(sink);
+                if (node != null) {
+                    node.unlink();
+                    boolean notify = false;
+                    if (node.cursor == null) {
+                        readyDirectSubs.addLast(node);
+                        // System.out.println("Subscription state change: un-ready direct -> ready direct: "+node);
+                    } else {
+                        if (readyPollingSubs.isEmpty()) {
+                            notify = !store.isEmpty();
+                        }
+                        readyPollingSubs.addLast(node);
+                        // System.out.println("Subscription state change: un-ready polling -> ready polling: "+node);
+                    }
+                    if (notify) {
+                        notifyReady();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            return getResourceName();
+        }
+
+        public boolean isSourceBlocked() {
+            throw new UnsupportedOperationException();
+        }
+
+        public IFlowSource<V> getFlowSource() {
+            return SharedQueue.this;
+        }
+
+    };
+
+    public SharedQueue(String name, IFlowLimiter<V> limiter) {
+        this(name, limiter, new Object());
+    }
+
+    /**
+     * Creates a flow queue that can handle multiple flows.
+     * 
+     * @param flow
+     *            The {@link Flow}
+     * @param controller
+     *            The FlowController if this queue is flow controlled:
+     */
+    public SharedQueue(String name, IFlowLimiter<V> limiter, Object mutex) {
+        super(name);
+        this.mutex = mutex;
+        Flow flow = new Flow(name, false);
+        this.sinkController = new FlowController<V>(getFlowControllableHook(), flow, limiter, mutex);
+        super.onFlowOpened(sinkController);
+    }
+
+    public boolean offer(V elem, ISourceController<V> source) {
+        return sinkController.offer(elem, source);
+    }
+
+    /**
+     * Performs a limited add to the queue.
+     */
+    public final void add(V value, ISourceController<V> source) {
+        sinkController.add(value, source);
+    }
+
+    /**
+     * Called when the controller accepts a message for this queue.
+     */
+    public void flowElemAccepted(ISourceController<V> controller, V value) {
+        synchronized (mutex) {
+
+            // Try to directly dispatch to one of the attached subscriptions
+            // sourceDispatch returns null on successful dispatch
+            ArrayList<SubscriptionNode> matches = directDispatch(value);
+            if (matches != null) {
+
+                if (directs != 0) {
+                    // System.out.println("could not directly dispatch.. had directly dispatched: "+directs);
+                    directs = 0;
+                }
+
+                K key = keyMapper.map(value);
+                StoreNode<K, V> node = store.add(key, value);
+
+                int matchCount = 0;
+                // Go through the un-ready direct subs and find out if any those
+                // would
+                // have matched the message, and if so then set it up to cursor
+                // from
+                // it.
+                SubscriptionNode sub = unreadyDirectSubs.getHead();
+                while (sub != null) {
+                    SubscriptionNode next = sub.getNext();
+                    if (sub.subscription.matches(value)) {
+                        sub.unlink();
+                        sub.resumeAt(node);
+                        unreadyPollingSubs.addLast(sub);
+                        matchCount++;
+                        // System.out.println("Subscription state change: un-ready direct -> un-ready polling: "+sub);
+                    }
+                    sub = next;
+                }
+
+                // Also do it for all the ready nodes that matched... but which
+                // we
+                // could not enqueue to.
+                for (SubscriptionNode subNode : matches) {
+                    subNode.unlink();
+                    subNode.resumeAt(node);
+                    unreadyPollingSubs.addLast(subNode);
+                    // System.out.println("Subscription state change: ready direct -> un-ready polling: "+subNode);
+                }
+                matchCount += matches.size();
+
+                if (matchCount > 0) {
+                    // We have interested subscriptions for the message.. but
+                    // they are not ready to receive.
+                    // Would be cool if we could flow control the source.
+                }
+
+                if (!readyPollingSubs.isEmpty()) {
+                    notifyReady();
+                }
+            } else {
+                directs++;
+            }
+        }
+    }
+
+    public FlowController<V> getFlowController(Flow flow) {
+        return sinkController;
+    }
+
+    public boolean isDispatchReady() {
+        return !store.isEmpty() && !readyPollingSubs.isEmpty();
+    }
+
+    private ArrayList<SubscriptionNode> directDispatch(V elem) {
+        ArrayList<SubscriptionNode> matches = new ArrayList<SubscriptionNode>(readyDirectSubs.size());
+        boolean accepted = false;
+        SubscriptionNode next = null;
+        SubscriptionNode node = readyDirectSubs.getHead();
+        while (node != null) {
+            next = node.getNext();
+            if (node.subscription.matches(elem)) {
+                accepted = node.subscription.getSink().offer(elem, sourceControler);
+                if (accepted) {
+                    if (autoRelease) {
+                        sinkController.elementDispatched(elem);
+                    }
+                    break;
+                } else {
+                    matches.add(node);
+                }
+            }
+            node = next;
+        }
+        if (next != null) {
+            readyDirectSubs.rotateTo(next);
+        }
+        return accepted ? null : matches;
+    }
+
+    public boolean pollingDispatch() {
+
+        // Keep looping until we can find one subscription that we can
+        // dispatch a message to.
+        while (true) {
+
+            // Find a subscription that has a message available for dispatch.
+            SubscriptionNode subNode = null;
+            StoreNode<K, V> storeNode = null;
+            synchronized (mutex) {
+
+                if (readyPollingSubs.isEmpty()) {
+                    return false;
+                }
+
+                SubscriptionNode next = null;
+                subNode = readyPollingSubs.getHead();
+                while (subNode != null) {
+                    next = subNode.getNext();
+
+                    storeNode = subNode.cursorPeek();
+                    if (storeNode != null) {
+                        // Found a message..
+                        break;
+                    } else {
+                        // Cursor dried up... this subscriber can now be direct
+                        // dispatched.
+                        // System.out.println("Subscription state change: ready polling -> ready direct: "+subNode);
+                        subNode.unlink();
+                        readyDirectSubs.addLast(subNode);
+                    }
+                    subNode = next;
+                }
+
+                if (storeNode == null) {
+                    return false;
+                }
+
+                if (next != null) {
+                    readyPollingSubs.rotateTo(next);
+                }
+            }
+
+            // The subscription's sink may be full..
+            IFlowSink<V> sink = subNode.subscription.getSink();
+            boolean accepted = sink.offer(storeNode.getValue(), sourceControler);
+
+            synchronized (mutex) {
+                if (accepted) {
+                    subNode.cursorNext();
+                    if (subNode.subscription.isPreAcquired() && subNode.subscription.isRemoveOnDispatch()) {
+                        StoreNode<K, V> removed = store.remove(storeNode.getKey());
+                        assert removed != null : "Since the node was aquired.. it should not have been removed by anyone else.";
+                        sinkController.elementDispatched(storeNode.getValue());
+                    }
+                    return true;
+                } else {
+                    // System.out.println("Subscription state change: ready polling -> un-ready polling: "+subNode);
+                    // Subscription is no longer ready..
+                    subNode.cursorUnPeek(storeNode);
+                    subNode.unlink();
+                    unreadyPollingSubs.addLast(subNode);
+                }
+            }
+        }
+    }
+
+    public void addSubscription(Subscription<V> subscription) {
+        synchronized (mutex) {
+            SubscriptionNode node = subscriptions.get(subscription);
+            if (node == null) {
+                node = new SubscriptionNode(subscription);
+                subscriptions.put(subscription, node);
+                sinks.put(subscription.getSink(), node);
+                if (!store.isEmpty()) {
+                    readyPollingSubs.addLast(node);
+                    notifyReady();
+                } else {
+                    readyDirectSubs.addLast(node);
+                }
+            }
+        }
+    }
+
+    public boolean removeSubscription(Subscription<V> subscription) {
+        synchronized (mutex) {
+            SubscriptionNode node = subscriptions.remove(subscription);
+            if (node != null) {
+                sinks.remove(subscription.getSink());
+                node.unlink();
+                return true;
+            }
+            return false;
+        }
+    }
+
+    private class SubscriptionNode extends LinkedNode<SubscriptionNode> {
+        public final Subscription<V> subscription;
+        public StoreCursor<K, V> cursor;
+
+        public SubscriptionNode(Subscription<V> subscription) {
+            this.subscription = subscription;
+            this.cursor = store.openCursor();
+        }
+
+        public void resumeAt(StoreNode<K, V> node) {
+            this.cursor = store.openCursorAt(node);
+        }
+
+        public void cursorNext() {
+            cursor.next();
+        }
+
+        public StoreNode<K, V> cursorPeek() {
+            if (cursor == null) {
+                return null;
+            }
+            while (cursor.hasNext()) {
+                StoreNode<K, V> elemNode = cursor.peekNext();
+
+                // Skip over messages that are not a match.
+                if (!subscription.matches(elemNode.getValue())) {
+                    cursor.next();
+                    continue;
+                }
+
+                if (subscription.isPreAcquired()) {
+                    if (elemNode.acquire(subscription)) {
+                        return elemNode;
+                    } else {
+                        cursor.next();
+                        continue;
+                    }
+                }
+            }
+            cursor = null;
+            return null;
+        }
+
+        public void cursorUnPeek(StoreNode<K, V> node) {
+            if (subscription.isPreAcquired()) {
+                node.unacquire();
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "subscription from " + getResourceName() + " to " + subscription;
+        }
+    }
+
+    public Mapper<K, V> getKeyMapper() {
+        return keyMapper;
+    }
+
+    public void setKeyMapper(Mapper<K, V> keyMapper) {
+        this.keyMapper = keyMapper;
+    }
+
+    public boolean removeByKey(K key) {
+        return false;
+    }
+
+    public boolean removeByValue(V value) {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return getResourceName();
+    }
+
+    public FlowController<V> getFlowControler() {
+        return this.sinkController;
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.dispatch.PriorityLinkedList;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.kahadb.util.LinkedNode;
+
+/**
+ */
+public class SingleFlowPriorityQueue<E> extends AbstractFlowQueue<E> {
+
+    private final PriorityLinkedList<PriorityNode> queue;
+    private Mapper<Integer, E> priorityMapper;
+    private final FlowController<E> controller;
+
+    private class PriorityNode extends LinkedNode<PriorityNode> {
+        E elem;
+        int prio;
+    }
+
+    private int messagePriority = 0;
+
+    /**
+     * Creates a flow queue that can handle multiple flows.
+     * 
+     * @param flow
+     *            The {@link Flow}
+     * @param controller
+     *            The FlowController if this queue is flow controlled:
+     */
+    public SingleFlowPriorityQueue(Flow flow, String name, IFlowLimiter<E> limiter) {
+        super(name);
+        this.queue = new PriorityLinkedList<PriorityNode>(10);
+        this.controller = new FlowController<E>(getFlowControllableHook(), flow, limiter, this);
+        super.onFlowOpened(controller);
+    }
+
+    public boolean offer(E elem, ISourceController<E> source) {
+        return controller.offer(elem, source);
+    }
+
+    /**
+     * Performs a limited add to the queue.
+     */
+    public final void add(E elem, ISourceController<E> source) {
+        controller.add(elem, source);
+    }
+
+    /**
+     * Called when the controller accepts a message for this queue.
+     */
+    public final void flowElemAccepted(ISourceController<E> controller, E elem) {
+        PriorityNode node = new PriorityNode();
+        node.elem = elem;
+        node.prio = priorityMapper.map(elem);
+
+        synchronized (this) {
+            queue.add(node, node.prio);
+            updatePriority();
+            notifyReady();
+        }
+    }
+
+    public FlowController<E> getFlowController(Flow flow) {
+        return controller;
+    }
+
+    public final boolean isDispatchReady() {
+        return !queue.isEmpty();
+    }
+
+    public boolean pollingDispatch() {
+        PriorityNode elem = null;
+        synchronized (this) {
+            elem = queue.poll();
+            updatePriority();
+            // FIXME the release should really be done after dispatch.
+            // doing it here saves us from having to resynchronize
+            // after dispatch, but release limiter space too soon.
+            if (autoRelease && elem != null) {
+                controller.elementDispatched(elem.elem);
+            }
+        }
+
+        if (elem != null) {
+            drain.drain(elem.elem, controller);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private final void updatePriority() {
+        if (dispatchContext != null) {
+            int newPrio = Math.max(queue.getHighestPriority(), dispatchPriority);
+            if (messagePriority != newPrio) {
+                messagePriority = newPrio;
+                dispatchContext.updatePriority(messagePriority);
+            }
+
+        }
+    }
+
+    public Mapper<Integer, E> getPriorityMapper() {
+        return priorityMapper;
+    }
+
+    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
+        this.priorityMapper = priorityMapper;
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.Iterator;
+
+public interface Store<K, V> {
+
+    public interface StoreNode<K, V> {
+
+        public boolean acquire(Subscription<V> ownerId);
+
+        public void unacquire();
+
+        public V getValue();
+
+        public K getKey();
+
+    }
+
+    public interface StoreCursor<K, V> extends Iterator<StoreNode<K, V>> {
+        public StoreNode<K, V> peekNext();
+
+        public void setNext(StoreNode<K, V> node);
+
+    }
+
+    StoreNode<K, V> remove(K key);
+
+    StoreNode<K, V> add(K key, V value);
+
+    StoreCursor<K, V> openCursor();
+
+    StoreCursor<K, V> openCursorAt(StoreNode<K, V> next);
+
+    boolean isEmpty();
+
+    int size();
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.IFlowSink;
+
+public interface Subscription<E> {
+
+    public boolean isRemoveOnDispatch();
+
+    public boolean isPreAcquired();
+
+    public boolean matches(E message);
+
+    public IFlowSink<E> getSink();
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.HashMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TreeMemoryStore<K, V> implements Store<K, V> {
+
+    AtomicLong counter = new AtomicLong();
+
+    class MemoryStoreNode implements StoreNode<K, V> {
+        private Subscription<V> owner;
+        private final K key;
+        private final V value;
+        private long id = counter.getAndIncrement();
+
+        public MemoryStoreNode(K key, V value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public boolean acquire(Subscription<V> owner) {
+            if (this.owner == null) {
+                this.owner = owner;
+                return true;
+            }
+            return false;
+        }
+
+        public K getKey() {
+            return key;
+        }
+
+        public V getValue() {
+            return value;
+        }
+
+        @Override
+        public String toString() {
+            return "node:" + id + ", owner=" + owner;
+        }
+
+        public void unacquire() {
+            this.owner = null;
+        }
+
+    }
+
+    class MemoryStoreCursor implements StoreCursor<K, V> {
+        private long last = -1;
+        private MemoryStoreNode next;
+
+        public MemoryStoreCursor() {
+        }
+
+        public MemoryStoreCursor(MemoryStoreNode next) {
+            this.next = next;
+        }
+
+        public void setNext(StoreNode<K, V> next) {
+            this.next = (MemoryStoreNode) next;
+        }
+
+        public boolean hasNext() {
+            if (next != null)
+                return true;
+
+            SortedMap<Long, MemoryStoreNode> m = order.tailMap(last + 1);
+            if (m.isEmpty()) {
+                next = null;
+            } else {
+                next = m.get(m.firstKey());
+            }
+            return next != null;
+        }
+
+        public StoreNode<K, V> peekNext() {
+            hasNext();
+            return next;
+        }
+
+        public StoreNode<K, V> next() {
+            try {
+                hasNext();
+                return next;
+            } finally {
+                last = next.id;
+                next = null;
+            }
+        }
+
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+
+    protected HashMap<K, MemoryStoreNode> map = new HashMap<K, MemoryStoreNode>();
+    protected TreeMap<Long, MemoryStoreNode> order = new TreeMap<Long, MemoryStoreNode>();
+
+    public StoreNode<K, V> add(K key, V value) {
+        MemoryStoreNode rc = new MemoryStoreNode(key, value);
+        MemoryStoreNode oldNode = map.put(key, rc);
+        if (oldNode != null) {
+            map.put(key, oldNode);
+            throw new IllegalArgumentException("Duplicate key violation");
+        }
+        order.put(rc.id, rc);
+        return rc;
+    }
+
+    public StoreNode<K, V> remove(K key) {
+        MemoryStoreNode node = (MemoryStoreNode) map.remove(key);
+        if (node != null) {
+            order.remove(node.id);
+        }
+        return node;
+    }
+
+    public boolean isEmpty() {
+        return map.isEmpty();
+    }
+
+    public org.apache.activemq.queue.Store.StoreCursor<K, V> openCursor() {
+        MemoryStoreCursor cursor = new MemoryStoreCursor();
+        return cursor;
+    }
+
+    public org.apache.activemq.queue.Store.StoreCursor<K, V> openCursorAt(org.apache.activemq.queue.Store.StoreNode<K, V> next) {
+        MemoryStoreCursor cursor = new MemoryStoreCursor((MemoryStoreNode) next);
+        return cursor;
+    }
+
+    public int size() {
+        return map.size();
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.AbstractLimitedFlowSource;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.IFlowResource;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.MockBrokerTest.MockBroker;
+import org.apache.activemq.flow.IFlowResource.FlowLifeCycleListener;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.queue.ExclusivePriorityQueue;
+import org.apache.activemq.queue.IAsynchronousFlowSource;
+import org.apache.activemq.queue.IBlockingFlowSource;
+import org.apache.activemq.queue.IFlowQueue;
+import org.apache.activemq.queue.IPollableFlowSource;
+import org.apache.activemq.queue.SingleFlowPriorityQueue;
+
+public abstract class AbstractTestConnection implements Service {
+    protected final IFlowQueue<Message> output;
+    protected final NetworkSource input;
+    protected final MockBroker broker;
+    protected final String name;
+    protected final Flow flow;
+
+    private AtomicBoolean running = new AtomicBoolean();
+    private Thread listener;
+    private Thread sender;
+
+    private final int outputQueueSize = 1000;
+    private final int resumeThreshold = 500;
+
+    private final int inputWindowSize = 1000;
+    private final int inputResumeThreshold = 900;
+
+    public static final int BLOCKING = 0;
+    public static final int POLLING = 1;
+    public static final int ASYNC = 2;
+    private final int dispatchMode;
+
+    AbstractTestConnection(MockBroker broker, String name, Flow flow, Pipe<Message> p) {
+        this.name = name;
+        this.broker = broker;
+        this.flow = flow;
+        this.dispatchMode = broker.dispatchMode;
+
+        // Set up an input source:
+        this.input = new NetworkSource(flow, name + "-INPUT", inputWindowSize, inputResumeThreshold);
+
+        // Setup output queue:
+        if (broker.priorityLevels <= 1) {
+            this.output = broker.getFlowManager().createFlowQueue(flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
+        } else {
+            ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(broker.priorityLevels, flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
+            t.setPriorityMapper(Message.PRIORITY_MAPPER);
+            this.output = t;
+        }
+
+        output.setDrain(new IFlowDrain<Message>() {
+            public void drain(Message m, ISourceController<Message> controller) {
+                try {
+                    write(m, controller);
+                } catch (InterruptedException e) {
+                    // TODO Auto-generated catch block
+                    Thread.currentThread().interrupt();
+                }
+            }
+        });
+
+        // We must watch the output for open and close of flows and communicate
+        // it
+        // to the peer.
+        output.addFlowLifeCycleListener(new FlowLifeCycleListener() {
+            public void onFlowClosed(IFlowResource resource, Flow flow) {
+                try {
+                    write(new FlowClose(flow), null);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+
+            public void onFlowOpened(IFlowResource resource, Flow flow) {
+                try {
+                    // Set the limiter to a WindowedLimiter capable of handling
+                    // flow control messages from the peer:
+                    output.getFlowController(flow).setLimiter(new WindowLimiter<Message>(true, flow, outputQueueSize, resumeThreshold));
+                    // Tell the other side that we've open a flow.
+                    write(new FlowOpen(flow), null);
+
+                    FlowController<Message> controller = output.getFlowController(flow);
+                    if (controller != null) {
+                        controller.setLimiter(new WindowLimiter<Message>(true, flow, outputQueueSize, resumeThreshold));
+                        // Tell the other side that we've open a flow.
+                        write(new FlowOpen(flow), null);
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+
+        });
+
+        output.setDispatchPriority(0);
+
+    }
+
+    public final void simulateEncodingWork() {
+        if (broker.ioWorkAmount > 1) {
+            fib(broker.ioWorkAmount);
+        }
+    }
+
+    public final void fib(int n) {
+        if (n > 1) {
+            fib(n - 1);
+            fib(n - 2);
+        }
+    }
+
+    private interface ProtocolLimiter {
+        public void onProtocolMessage(Message m);
+    }
+
+    private class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter {
+        final Flow flow;
+        final boolean clientMode;
+        private int available;
+
+        public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
+            super(capacity, resumeThreshold);
+            this.clientMode = clientMode;
+            this.flow = flow;
+        }
+
+        protected void remove(int size) {
+            super.remove(size);
+            if (!clientMode) {
+                available += size;
+                if (available > capacity - resumeThreshold) {
+                    try {
+                        write(new FlowMessage(flow, available, 0), null);
+                    } catch (InterruptedException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                    available = 0;
+                }
+            }
+        }
+
+        public void onProtocolMessage(Message m) {
+            if (m.type() == Message.TYPE_FLOW_CONTROL) {
+                FlowMessage fm = (FlowMessage) m;
+                super.remove(fm.size);
+            }
+        }
+
+        public int getElementSize(Message m) {
+            return m.getFlowLimiterSize();
+        }
+    }
+
+    public final void start() throws Exception {
+        running.set(true);
+        if (dispatchMode == BLOCKING) {
+            listener = new Thread(new Runnable() {
+                public void run() {
+                    try {
+                        while (true) {
+                            input.blockingDispatch();
+                        }
+                    } catch (InterruptedException e) {
+                        return;
+                    }
+                }
+            }, name + "-Listener");
+            listener.start();
+            sender = new Thread(new Runnable() {
+                public void run() {
+                    try {
+                        while (true) {
+                            output.blockingDispatch();
+                        }
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }, name + "-Sender");
+            sender.start();
+        } else {
+            output.setDispatcher(broker.dispatcher);
+            input.setDispatcher(broker.dispatcher);
+            return;
+        }
+
+    }
+
+    public final void stop() throws Exception {
+        running.set(false);
+        if (dispatchMode == BLOCKING) {
+            listener.interrupt();
+            listener.join();
+            sender.interrupt();
+            sender.join();
+        }
+    }
+
+    protected abstract Message getNextMessage() throws InterruptedException;
+
+    protected abstract Message pollNextMessage();
+
+    protected abstract void addReadReadyListener(ReadReadyListener listener);
+
+    /**
+     * Must be implemented to write a message.
+     * 
+     * @param m
+     */
+    protected abstract void write(Message m, ISourceController<Message> controller) throws InterruptedException;
+
+    /**
+     * Handles a received message.
+     */
+    protected abstract void messageReceived(Message m, ISourceController<Message> controller);
+
+    /**
+     * Simulates a network source of messages.
+     * 
+     * @param <E>
+     */
+    protected class NetworkSource extends AbstractLimitedFlowSource<Message> implements IBlockingFlowSource<Message>, IAsynchronousFlowSource<Message>, IPollableFlowSource<Message>,
+            FlowControllable<Message> {
+        private final FlowController<Message> flowController;
+        private final Flow flow;
+        private final IFlowQueue<Message> inputQueue;
+        private DispatchContext dispatchContext;
+
+        public NetworkSource(Flow flow, String name, int capacity, int resumeThreshold) {
+            super(name);
+            if (flow == null) {
+                if (broker.useInputQueues) {
+                    inputQueue = broker.getFlowManager().createFlowQueue(flow, name, capacity, resumeThreshold);
+                } else {
+                    inputQueue = null;
+                }
+                flowController = null;
+            } else {
+                if (broker.useInputQueues) {
+                    if (broker.priorityLevels <= 1) {
+                        inputQueue = broker.getFlowManager().createFlowQueue(flow, name, capacity, resumeThreshold);
+                    } else {
+                        SingleFlowPriorityQueue<Message> t = new SingleFlowPriorityQueue<Message>(flow, name, new SizeLimiter<Message>(capacity, resumeThreshold));
+                        t.setPriorityMapper(Message.PRIORITY_MAPPER);
+                        inputQueue = t;
+                    }
+                    flowController = inputQueue.getFlowController(flow);
+                    // Allow overflow we should be limited by protocol:
+                    flowController.useOverFlowQueue(false);
+                    flowController.setLimiter(new SizeLimiter<Message>(capacity, resumeThreshold));
+                    super.onFlowOpened(flowController);
+                } else {
+                    inputQueue = null;
+                    SizeLimiter<Message> limiter = new SizeLimiter<Message>(capacity, resumeThreshold);
+                    flowController = new FlowController<Message>(this, flow, limiter, this);
+                    // Allow overflow we should be limited by protocol:
+                    flowController.useOverFlowQueue(false);
+                    super.onFlowOpened(flowController);
+                }
+            }
+            this.flow = flow;
+        }
+
+        public synchronized void setDispatcher(final IDispatcher dispatcher) {
+
+            if (inputQueue != null) {
+                inputQueue.setDrain(new IFlowDrain<Message>() {
+
+                    public void drain(Message elem, ISourceController<Message> controller) {
+                        messageReceived(elem, controller);
+                        controller.elementDispatched(elem);
+                    }
+
+                });
+                inputQueue.setDispatcher(dispatcher);
+                inputQueue.setDispatchPriority(0);
+            }
+
+            dispatchContext = dispatcher.register(new Dispatchable() {
+                public boolean dispatch() {
+                    if (!pollingDispatch()) {
+                        addReadReadyListener(new ReadReadyListener() {
+                            public void onReadReady() {
+                                if (running.get()) {
+                                    dispatchContext.requestDispatch();
+                                }
+                            }
+                        });
+                        return true;
+                    }
+                    return false;
+                }
+            }, name + "-IOInbound");
+
+            // For reading messages assume maximum priority: These are placed
+            // into
+            // the input queue, where message priority will dictate broker
+            // dispatch
+            // priority. Note that flow control from the input queue will limit
+            // dispatch
+            // of lower priority messages:
+            if (broker.useInputQueues) {
+                dispatchContext.updatePriority(Message.MAX_PRIORITY);
+            }
+            dispatchContext.requestDispatch();
+
+        }
+
+        public FlowController<Message> getFlowController(Flow flow) {
+            if (this.flow != null) {
+                return flowController;
+            } else {
+                return super.getFlowController(flow);
+            }
+        }
+
+        public void blockingDispatch() throws InterruptedException {
+            Message m = getNextMessage();
+            dispatch(m, getFlowController(m.getFlow()));
+        }
+
+        public void dispatch(Message m, FlowController<Message> controller) {
+
+            switch (m.type()) {
+            case Message.TYPE_FLOW_CONTROL: {
+                FlowMessage fm = (FlowMessage) m;
+                synchronized (output) {
+                    try {
+                        FlowController<Message> outputController = output.getFlowController(fm.getFlow());
+                        ProtocolLimiter pl = (ProtocolLimiter) outputController.getLimiter();
+                        synchronized (output) {
+                            pl.onProtocolMessage(fm);
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+                break;
+            }
+            case Message.TYPE_FLOW_OPEN: {
+                FlowController<Message> inputController = new FlowController<Message>(this, m.getFlow(), new WindowLimiter<Message>(false, m.getFlow(), inputWindowSize, inputResumeThreshold), this);
+                // Allow overflow we should be limited by protocol:
+                inputController.useOverFlowQueue(false);
+                super.onFlowOpened(inputController);
+                break;
+            }
+            case Message.TYPE_FLOW_CLOSE: {
+                super.onFlowClosed(m.getFlow());
+                break;
+            }
+            default: {
+                if (inputQueue != null) {
+                    inputQueue.add(m, null);
+                } else {
+                    controller.add(m, null);
+                }
+            }
+            }
+        }
+
+        public void addFlowReadyListener(final IPollableFlowSource.FlowReadyListener<Message> listener) {
+            addReadReadyListener(new ReadReadyListener() {
+
+                public void onReadReady() {
+                    listener.onFlowReady(NetworkSource.this);
+                }
+
+            });
+        }
+
+        public boolean pollingDispatch() {
+
+            Message m = pollNextMessage();
+            if (m != null) {
+                dispatch(m, getFlowController(m.getFlow()));
+                return true;
+            }
+            return false;
+        }
+
+        // Called by FlowController.add()....
+        public void flowElemAccepted(final ISourceController<Message> controller, final Message elem) {
+            messageReceived(elem, controller);
+            controller.elementDispatched(elem);
+        }
+
+        public IFlowSink<Message> getFlowSink() {
+            // No sink, this is a source only:
+            return null;
+        }
+
+        public IFlowSource<Message> getFlowSource() {
+            return this;
+        }
+
+        public boolean isDispatchReady() {
+            return true;
+        }
+    }
+
+    public class FlowMessage extends Message {
+        int size;
+        int count;
+
+        FlowMessage(Flow flow, int size, int count) {
+            super(0, 0, null, flow, null, 0);
+            this.size = size;
+            this.count = count;
+        }
+
+        public short type() {
+            return TYPE_FLOW_CONTROL;
+        }
+
+        public boolean isSystem() {
+            return true;
+        }
+
+        public int getSize() {
+            return size;
+        }
+    }
+
+    public class FlowOpen extends Message {
+        FlowOpen(Flow flow) {
+            super(0, 0, null, flow, null, 0);
+        }
+
+        public short type() {
+            return TYPE_FLOW_OPEN;
+        }
+
+        public boolean isSystem() {
+            return true;
+        }
+    }
+
+    public class FlowClose extends Message {
+        FlowClose(Flow flow) {
+            super(0, 0, null, flow, null, 0);
+        }
+
+        public short type() {
+            return TYPE_FLOW_CLOSE;
+        }
+
+        public boolean isSystem() {
+            return true;
+        }
+    }
+
+    public interface ReadReadyListener {
+        public void onReadReady();
+    }
+
+    public String getName() {
+        return name;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public class Destination {
+
+    String name;
+
+    Destination(String name) {
+        this.name = name;
+    }
+
+    public final String getName() {
+        return name;
+    }
+
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        } else if (o instanceof Destination) {
+            return equals((Destination) o);
+        }
+        return false;
+    }
+
+    public boolean equals(Destination d) {
+        return name.equals(d.name);
+    }
+
+    public int hashCode() {
+        return name.hashCode();
+    }
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.HashSet;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.queue.Mapper;
+
+public class Message {
+
+    public static final Mapper<Integer, Message> PRIORITY_MAPPER = new Mapper<Integer, Message>() {
+        public Integer map(Message element) {
+            return element.priority;
+        }
+    };
+
+    public static final int MAX_USER_PRIORITY = 10;
+    public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1;
+    public static final int SYSTEM_PRIORITY = MAX_PRIORITY;
+
+    public static final short TYPE_NORMAL = 0;
+    public static final short TYPE_FLOW_CONTROL = 1;
+    public static final short TYPE_FLOW_OPEN = 2;
+    public static final short TYPE_FLOW_CLOSE = 3;
+
+    final String msg;
+    final Flow flow;
+    final Destination dest;
+    int hopCount;
+    HashSet<String> matchProps;
+    final long msgId;
+    final int producerId;
+    final int priority;
+
+    Message(long msgId, int producerId, String msg, Flow flow, Destination dest, int priority) {
+        this.msgId = msgId;
+        this.producerId = producerId;
+        this.msg = msg;
+        this.flow = flow;
+        this.dest = dest;
+        this.priority = priority;
+        hopCount = 0;
+    }
+
+    Message(Message m) {
+        this.msgId = m.msgId;
+        this.producerId = m.producerId;
+        this.msg = m.msg;
+        this.flow = m.flow;
+        this.dest = m.dest;
+        this.matchProps = m.matchProps;
+        this.priority = m.priority;
+        hopCount = m.hopCount;
+    }
+
+    public short type() {
+        return TYPE_NORMAL;
+    }
+
+    public void setProperty(String matchProp) {
+        if (matchProps == null) {
+            matchProps = new HashSet<String>();
+        }
+        matchProps.add(matchProp);
+    }
+
+    public boolean match(String matchProp) {
+        if (matchProps == null) {
+            return false;
+        }
+        return matchProps.contains(matchProp);
+    }
+
+    public boolean isSystem() {
+        return false;
+    }
+
+    public void incrementHopCount() {
+        hopCount++;
+    }
+
+    public final int getHopCount() {
+        return hopCount;
+    }
+
+    public final Destination getDestination() {
+        return dest;
+    }
+
+    public Flow getFlow() {
+        return flow;
+    }
+
+    public int getFlowLimiterSize() {
+        return 1;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public String toString() {
+        return "Message: " + msg + " flow + " + flow + " dest: " + dest;
+    }
+
+    public long getMsgId() {
+        return msgId;
+    }
+
+    public int getProducerId() {
+        return producerId;
+    }
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MessageGenerator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MessageGenerator.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MessageGenerator.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MessageGenerator.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public interface MessageGenerator {
+    interface MessageReadyListener {
+        public void onMessageReady(Message m);
+    }
+
+    public void addMessageReadyListener(MessageReadyListener listener);
+
+    public Message pollMessage();
+
+}



Mime
View raw message