activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r769099 [3/5] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...
Date Mon, 27 Apr 2009 18:40:49 GMT
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Mon Apr 27 18:40:44 2009
@@ -17,8 +17,10 @@
 package org.apache.activemq.broker.store.memory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -26,14 +28,16 @@
 import java.io.File;
 
 import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 
-
 /**
- * An in memory implementation of the {@link Store} interface. It does not
+ * An in memory implementation of the {@link QueueStore} interface. It does not
  * properly roll back operations if an error occurs in the middle of a
  * transaction and it does not persist changes across restarts.
  */
@@ -85,45 +89,137 @@
     }
 
     static private class StoredQueue {
-        long sequence;
+        QueueStore.QueueDescriptor descriptor;
+
         TreeMap<Long, QueueRecord> records = new TreeMap<Long, QueueRecord>();
         // Maps tracking to sequence number:
         HashMap<Long, Long> trackingMap = new HashMap<Long, Long>();
+        int count = 0;
+        long size = 0;
+        HashMap<QueueStore.QueueDescriptor, StoredQueue> partitions;
+        StoredQueue parent;
+
+        StoredQueue(QueueStore.QueueDescriptor descriptor) {
+            this.descriptor = descriptor.copy();
+        }
 
-        public Long add(QueueRecord record) {
-            long sequenceKey = ++sequence;
-            record.setQueueKey(sequenceKey);
-            records.put(sequenceKey, record);
+        public void add(QueueRecord record) {
+            records.put(record.getQueueKey(), record);
             trackingMap.put(record.getMessageKey(), record.getQueueKey());
-            return sequenceKey;
+            count++;
+            size += record.getSize();
         }
 
         public boolean remove(Long msgKey) {
             Long sequenceKey = trackingMap.remove(msgKey);
             if (sequenceKey != null) {
-                records.remove(sequenceKey);
+                QueueRecord record = records.remove(sequenceKey);
+                count--;
+                size -= record.getSize();
                 return true;
             }
             return false;
         }
 
-        public Iterator<QueueRecord> list(Long firstQueueKey, int max) {
-            ArrayList<QueueRecord> list = new ArrayList<QueueRecord>(max);
+        public Iterator<QueueRecord> list(Long firstQueueKey, long maxSequence, int max) {
+            Collection<QueueRecord> list;
+            if (max < 0) {
+                list = new LinkedList<QueueRecord>();
+            } else {
+                list = new ArrayList<QueueRecord>(max);
+            }
+            
             for (Long key : records.tailMap(firstQueueKey).keySet()) {
-                if (list.size() >= max) {
+                if ((max >= 0 && list.size() >= max) || (maxSequence >= 0 && key > maxSequence)) {
                     break;
                 }
                 list.add(records.get(key));
             }
             return list.iterator();
         }
+
+        public int getCount() {
+            return count;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public void setParent(StoredQueue parent) {
+            this.parent = parent;
+            if (parent == null) {
+                this.descriptor.setParent(null);
+            } else {
+                this.descriptor.setParent(parent.getQueueName());
+            }
+        }
+
+        public StoredQueue getParent() {
+            return parent;
+        }
+
+        public void addPartition(StoredQueue child) {
+            if (partitions == null) {
+                partitions = new HashMap<QueueStore.QueueDescriptor, StoredQueue>();
+            }
+
+            partitions.put(child.getDescriptor(), child);
+        }
+
+        public boolean removePartition(StoredQueue name) {
+            if (partitions == null) {
+                return false;
+            }
+
+            StoredQueue old = partitions.remove(name);
+            if (old != null) {
+                return true;
+            }
+            return false;
+        }
+
+        public Iterator<StoredQueue> getPartitions() {
+            if (partitions == null) {
+                return null;
+            }
+
+            return partitions.values().iterator();
+        }
+
+        public AsciiBuffer getQueueName() {
+            return descriptor.getQueueName();
+        }
+
+        public QueueStore.QueueDescriptor getDescriptor() {
+            return descriptor;
+        }
+
+        public QueueQueryResultImpl query() {
+            QueueQueryResultImpl result = new QueueQueryResultImpl();
+            result.count = count;
+            result.size = size;
+            result.firstSequence = records.isEmpty() ? 0 : records.firstEntry().getValue().getQueueKey();
+            result.lastSequence = records.isEmpty() ? 0 : records.lastEntry().getValue().getQueueKey();
+            result.desc = descriptor.copy();
+            if (this.partitions != null) {
+                ArrayList<QueueQueryResult> childResults = new ArrayList<QueueQueryResult>(partitions.size());
+                for (StoredQueue child : partitions.values()) {
+                    childResults.add(child.query());
+                }
+                result.partitions = childResults;
+            }
+
+            return result;
+        }
+
     }
 
     static private class RemoveOp {
-        AsciiBuffer queue;
+        QueueStore.QueueDescriptor queue;
         Long messageKey;
 
-        public RemoveOp(AsciiBuffer queue, Long messageKey) {
+        public RemoveOp(QueueStore.QueueDescriptor queue, Long messageKey) {
             this.queue = queue;
             this.messageKey = messageKey;
         }
@@ -149,7 +245,7 @@
             adds.add(messageKey);
         }
 
-        public void removeMessage(AsciiBuffer queue, Long messageKey) {
+        public void removeMessage(QueueStore.QueueDescriptor queue, Long messageKey) {
             removes.add(new RemoveOp(queue, messageKey));
         }
     }
@@ -163,6 +259,42 @@
         }
     }
 
+    private static class QueueQueryResultImpl implements QueueQueryResult {
+
+        QueueStore.QueueDescriptor desc;
+        Collection<QueueQueryResult> partitions;
+        long size;
+        int count;
+        long firstSequence;
+        long lastSequence;
+
+        public QueueStore.QueueDescriptor getDescriptor() {
+            return desc;
+        }
+
+        public Collection<QueueQueryResult> getPartitions() {
+            return partitions;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public int getCount() {
+            return count;
+        }
+
+        public long getFirstSequence() {
+            // TODO Auto-generated method stub
+            return firstSequence;
+        }
+
+        public long getLastSequence() {
+            // TODO Auto-generated method stub
+            return lastSequence;
+        }
+    }
+
     private class MemorySession implements Session {
 
         long streamSequence;
@@ -204,49 +336,114 @@
         // //////////////////////////////////////////////////////////////////////////////
         // Queue related methods.
         // ///////////////////////////////////////////////////////////////////////////////
-        public void queueAdd(AsciiBuffer queueName) {
-            StoredQueue queue = queues.get(queueName);
+        public void queueAdd(QueueStore.QueueDescriptor desc) throws KeyNotFoundException {
+            StoredQueue queue = queues.get(desc.getQueueName());
             if (queue == null) {
-                queue = new StoredQueue();
-                queues.put(queueName, queue);
+                queue = new StoredQueue(desc);
+                // Add to the parent:
+                AsciiBuffer parent = desc.getParent();
+
+                // If the parent doesn't exist create it:
+                if (parent != null) {
+                    StoredQueue parentQueue = queues.get(parent);
+                    if (parentQueue == null) {
+                        throw new KeyNotFoundException("No parent " + parent + " for " + desc.getQueueName().toString());
+                    }
+
+                    parentQueue.addPartition(queue);
+                    queue.setParent(parentQueue);
+                }
+
+                // Add the queue:
+                queues.put(desc.getQueueName(), queue);
             }
         }
 
-        public void queueRemove(AsciiBuffer queueName) {
-            StoredQueue queue = queues.get(queueName);
+        public void queueRemove(QueueStore.QueueDescriptor desc) {
+            StoredQueue queue = queues.get(desc.getQueueName());
             if (queue != null) {
-                queues.remove(queueName);
+                // Remove message references:
+                for (QueueRecord record : queue.records.values()) {
+                    deleteMessageReference(record.getMessageKey());
+                }
+
+                // Remove parent reference:
+                StoredQueue parent = queue.getParent();
+                if (parent != null) {
+                    parent.removePartition(queue);
+                }
+
+                // Delete partitions
+                Iterator<StoredQueue> partitions = queue.getPartitions();
+                if (partitions != null) {
+                    while (partitions.hasNext()) {
+                        QueueStore.QueueDescriptor child = partitions.next().getDescriptor();
+                        queueRemove(child);
+                    }
+                }
+
+                // Remove the queue:
+                queues.remove(desc.getQueueName());
             }
         }
 
-        public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
-            return list(queues, firstQueueName, max);
+        // Queue related methods.
+        public Iterator<QueueQueryResult> queueListByType(short type, QueueStore.QueueDescriptor firstQueue, int max) {
+            return queueListInternal(firstQueue, type, max);
+        }
+
+        public Iterator<QueueQueryResult> queueList(QueueStore.QueueDescriptor firstQueue, int max) {
+            return queueListInternal(firstQueue, (short) -1, max);
         }
 
-        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
-            Long sequenceKey = get(queues, queueName).add(record);
+        private Iterator<QueueQueryResult> queueListInternal(QueueStore.QueueDescriptor firstQueue, short type, int max) {
+            Collection<StoredQueue> tailResults;
+            LinkedList<QueueQueryResult> results = new LinkedList<QueueQueryResult>();
+            if (firstQueue == null) {
+                tailResults = queues.values();
+            } else {
+                tailResults = queues.tailMap(firstQueue.getQueueName()).values();
+            }
+
+            for (StoredQueue sq : tailResults) {
+                if (max >=0 && results.size() >= max) {
+                    break;
+                }
+                if (type != -1 && sq.descriptor.getApplicationType() != type) {
+                    continue;
+                }
+                results.add(sq.query());
+            }
+
+            return results.iterator();
+        }
+
+        public void queueAddMessage(QueueStore.QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException {
+            get(queues, queue.getQueueName()).add(record);
             MessageRecordHolder holder = messages.get(record.getMessageKey());
             if (holder != null) {
                 holder.refs++;
             }
-            return sequenceKey;
+        }
 
+        public void queueRemoveMessage(QueueStore.QueueDescriptor queue, Long msgKey) throws KeyNotFoundException {
+            if (get(queues, queue.getQueueName()).remove(msgKey)) {
+                deleteMessageReference(msgKey);
+            }
         }
 
-        public void queueRemoveMessage(AsciiBuffer queueName, Long msgKey) throws KeyNotFoundException {
-            if (get(queues, queueName).remove(msgKey)) {
-                MessageRecordHolder holder = messages.get(msgKey);
-                if (holder != null) {
-                    holder.refs--;
-                    if (holder.refs <= 0) {
-                        messages.remove(msgKey);
-                    }
+        private void deleteMessageReference(Long msgKey) {
+            MessageRecordHolder holder = messages.get(msgKey);
+            if (holder != null) {
+                holder.refs--;
+                if (holder.refs <= 0) {
+                    messages.remove(msgKey);
                 }
             }
         }
 
-        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
-            return get(queues, queueName).list(firstQueueKey, max);
+        public Iterator<QueueRecord> queueListMessagesQueue(QueueStore.QueueDescriptor queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
+            return get(queues, queue.getQueueName()).list(firstQueueKey, maxQueueKey, max);
         }
 
         // //////////////////////////////////////////////////////////////////////////////
@@ -337,7 +534,7 @@
             }
         }
 
-        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queue, Long messageKey) throws KeyNotFoundException {
+        public void transactionRemoveMessage(Buffer txid, QueueStore.QueueDescriptor queue, Long messageKey) throws KeyNotFoundException {
             get(transactions, txid).removeMessage(queue, messageKey);
             MessageRecordHolder holder = messages.get(messageKey);
             if (holder != null) {
@@ -347,7 +544,6 @@
                 }
             }
         }
-
     }
 
     public void start() throws Exception {
@@ -401,7 +597,7 @@
 
     public void setDeleteAllMessages(boolean val) {
         // TODO Auto-generated method stub
-        
+
     }
 
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Mon Apr 27 18:40:44 2009
@@ -27,7 +27,8 @@
 
 import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
 import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
-import org.apache.activemq.queue.Mapper;
+import org.apache.activemq.util.Mapper;
+import org.apache.activemq.util.PriorityLinkedList;
 import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java Mon Apr 27 18:40:44 2009
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.dispatch;
-
-import java.util.ArrayList;
-
-import org.apache.activemq.queue.Mapper;
-import org.apache.kahadb.util.LinkedNode;
-import org.apache.kahadb.util.LinkedNodeList;
-
-public class PriorityLinkedList<E extends LinkedNode<E>> {
-
-    private Mapper<Integer, E> priorityMapper;
-    private final ArrayList<LinkedNodeList<E>> priorityLists;
-    private int highesPriority = 0;
-
-    public PriorityLinkedList(int numPriorities) {
-        this(numPriorities, null);
-    }
-
-    public PriorityLinkedList(int numPriorities, Mapper<Integer, E> priorityMapper) {
-        this.priorityMapper = priorityMapper;
-        priorityLists = new ArrayList<LinkedNodeList<E>>();
-        for (int i = 0; i <= numPriorities; i++) {
-            priorityLists.add(new LinkedNodeList<E>());
-        }
-    }
-
-    public final int getHighestPriority() {
-        return highesPriority;
-    }
-
-    /**
-     * Gets the element at the front of the list:
-     * 
-     * @return
-     */
-    public final E poll() {
-        LinkedNodeList<E> ll = getHighestPriorityList();
-        if (ll == null) {
-            return null;
-        }
-        E node = ll.getHead();
-        node.unlink();
-
-        return node;
-    }
-
-    public final boolean isEmpty() {
-        return peek() != null;
-    }
-
-    /**
-     * Gets the element at the front of the list:
-     * 
-     * @return
-     */
-    public final E peek() {
-        LinkedNodeList<E> ll = getHighestPriorityList();
-        if (ll == null) {
-            return null;
-        }
-
-        return ll.getHead();
-    }
-
-    public final void add(E element) {
-        int prio = priorityMapper.map(element);
-        add(element, prio);
-    }
-
-    public final void add(E element, int prio) {
-        LinkedNodeList<E> ll = priorityLists.get(prio);
-        ll.addLast(element);
-        if (prio > highesPriority) {
-            highesPriority = prio;
-        }
-    }
-
-    private final LinkedNodeList<E> getHighestPriorityList() {
-        LinkedNodeList<E> ll = priorityLists.get(highesPriority);
-        while (ll.isEmpty()) {
-            if (highesPriority == 0) {
-                return null;
-            }
-            highesPriority--;
-            ll = priorityLists.get(highesPriority);
-        }
-
-        return ll;
-    }
-
-    public Mapper<Integer, E> getPriorityMapper() {
-        return priorityMapper;
-    }
-
-    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
-        this.priorityMapper = priorityMapper;
-    }
-
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java Mon Apr 27 18:40:44 2009
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.dispatch;
-
-import java.util.Arrays;
-
-public class PriorityMap<E> {
-
-    int first;
-    int base;
-    int size;
-
-    Object elements[] = new Object[1];
-
-    public E put(int key, E value) {
-        E rc = null;
-        if (isEmpty()) {
-            // This will be the first base prioritu..
-            base = key;
-            elements[0] = value;
-            first = 0;
-        } else {
-            if (key > base) {
-                // New priority is after the current base, we may need to
-                // expaned the
-                // priority array to fit this new one in.
-                int index = key - base;
-                if (elements.length <= index) {
-                    // The funky thing is if the original base was removed,
-                    // resizing
-                    // will rebase the at the first.
-                    resize(index + 1, 0);
-                }
-                if (index < first) {
-                    first = index;
-                }
-                rc = element(index);
-                elements[index] = value;
-            } else {
-                // Ok this element is before the current base so we need to
-                // resize/rebase
-                // using this element as the base.
-                int oldLastIndex = indexOfLast();
-                int newLastIndex = (base + oldLastIndex) - key;
-                resize(newLastIndex + 1, first + (base - key), (oldLastIndex - first) + 1);
-                elements[0] = value;
-                first = 0;
-            }
-        }
-        if (rc == null) {
-            size++;
-        }
-        return rc;
-    }
-
-    private int indexOfLast() {
-        int i = elements.length - 1;
-        while (i >= 0) {
-            if (elements[i] != null) {
-                return i;
-            }
-            i--;
-        }
-        return -1;
-    }
-
-    private void resize(int newSize, int firstOffset) {
-        int count = Math.min(elements.length - first, newSize);
-        resize(newSize, firstOffset, count);
-    }
-
-    private void resize(int newSize, int firstOffset, int copyCount) {
-        Object t[];
-        if (elements.length == newSize) {
-            t = elements;
-            System.arraycopy(elements, first, t, firstOffset, copyCount);
-            Arrays.fill(t, 0, firstOffset, null);
-        } else {
-            t = new Object[newSize];
-            System.arraycopy(elements, first, t, firstOffset, copyCount);
-        }
-        base += (first - firstOffset);
-        elements = t;
-    }
-
-    public E get(int priority) {
-        int index = priority - base;
-        if (index < 0 || index >= elements.length) {
-            return null;
-        }
-        return element(index);
-    }
-
-    @SuppressWarnings("unchecked")
-    private E element(int index) {
-        return (E) elements[index];
-    }
-
-    public E remove(int priority) {
-        int index = priority - base;
-        if (index < 0 || index >= elements.length) {
-            return null;
-        }
-        E rc = element(index);
-        elements[index] = null;
-        if (rc != null) {
-            size--;
-        }
-        return rc;
-    }
-
-    public boolean isEmpty() {
-        return size == 0;
-    }
-
-    public E firstValue() {
-        if (size == 0) {
-            return null;
-        }
-        E rc = element(first);
-        while (rc == null) {
-            // The first element may have been removed so we need to find it...
-            first++;
-            rc = element(first);
-        }
-        return (E) rc;
-    }
-
-    public Integer firstKey() {
-        if (size == 0) {
-            return null;
-        }
-        E rc = element(first);
-        while (rc == null) {
-            // The first element may have been removed so we need to find it...
-            first++;
-            rc = element(first);
-        }
-        return first;
-    }
-
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Mon Apr 27 18:40:44 2009
@@ -23,7 +23,7 @@
 
 public class SimpleLoadBalancer<D extends IDispatcher> implements ExecutionLoadBalancer<D> {
 
-    private final boolean DEBUG = true;
+    private final boolean DEBUG = false;
 
     public SimpleLoadBalancer() {
     }
@@ -59,10 +59,10 @@
     }
     
     public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context) {
-        return new SimpleExecutionTracker<D>(context);
+        return new SimpleExecutionTracker(context);
     }
 
-    private class SimpleExecutionTracker<D extends IDispatcher> implements ExecutionTracker<D> {
+    private class SimpleExecutionTracker implements ExecutionTracker<D> {
         private final HashMap<PooledDispatchContext<D>, ExecutionStats<D>> sources = new HashMap<PooledDispatchContext<D>, ExecutionStats<D>>();
         private final PooledDispatchContext<D> context;
         private final AtomicInteger work = new AtomicInteger(0);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Mon Apr 27 18:40:44 2009
@@ -94,7 +94,10 @@
         this.flow = flow;
         this.limiter = limiter == null ? new SizeLimiter<E>(0, 0) : limiter;
         this.mutex = mutex;
-        this.name = controllable.toString();
+        if(controllable != null)
+        {
+            this.name = controllable.toString();
+        }
     }
 
     public final IFlowLimiter<E> getLimiter() {
@@ -177,12 +180,6 @@
         }
     }
 
-    public final boolean isSourceBlocked() {
-        synchronized (mutex) {
-            return blocked;
-        }
-    }
-
     /**
      * Waits for a flow to become unblocked.
      * 
@@ -203,7 +200,11 @@
     }
 
     public IFlowResource getFlowResource() {
-        return controllable.getFlowResource();
+        if (controllable != null) {
+            return controllable.getFlowResource();
+        } else {
+            return null;
+        }
     }
 
     /**
@@ -231,7 +232,7 @@
                     blockSource(sourceController);
                 }
             } else {
-                // Add to overflow queue and block source:
+                // Add to overflow queue and restoreBlock source:
                 overflowQueue.add(elem);
                 if (sourceController != null) {
                     blockSource(sourceController);
@@ -325,7 +326,8 @@
         setUnThrottleListener();
 
         if (!blockedSources.contains(source)) {
-//            System.out.println("BLOCKING  : SINK[" + this + "], SOURCE[" + source + "]");
+            // System.out.println("BLOCKING  : SINK[" + this + "], SOURCE[" +
+            // source + "]");
             blockedSources.add(source);
             source.onFlowBlock(this);
         }
@@ -389,7 +391,9 @@
                     }
                     try {
                         for (ISourceController<?> source : blockedSources) {
-//                            System.out.println("UNBLOCKING: SINK[" + FlowController.this + "], SOURCE[" + source + "]");
+                            // System.out.println("UNBLOCKING: SINK[" +
+                            // FlowController.this + "], SOURCE[" + source +
+                            // "]");
                             source.onFlowResume(FlowController.this);
                         }
                         for (FlowUnblockListener<E> listener : unblockListeners) {
@@ -423,10 +427,6 @@
         return name;
     }
 
-    public IFlowResource getFlowSink() {
-        return controllable.getFlowResource();
-    }
-
     public void setExecutor(Executor executor) {
         synchronized (mutex) {
             this.executor = executor;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java Mon Apr 27 18:40:44 2009
@@ -21,7 +21,7 @@
 public interface IFlowSink<E> extends IFlowResource {
     /**
      * Adds an element to the sink. If limiter space in the sink is overflowed
-     * by the element then it will block the source controller.
+     * by the element then it will restoreBlock the source controller.
      * 
      * @param elem
      *            The element to add to the sink.

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java Mon Apr 27 18:40:44 2009
@@ -54,8 +54,6 @@
      */
     public void onFlowResume(ISinkController<?> sinkController);
 
-    public boolean isSourceBlocked();
-
     /**
      * Must be called once the elements have been sent to downstream sinks.
      * 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java Mon Apr 27 18:40:44 2009
@@ -92,6 +92,6 @@
     }
 
     public void setExecutor(Executor executor) {
-        // Don't need an executor since we don't block.
+        // Don't need an executor since we don't restoreBlock.
     }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java Mon Apr 27 18:40:44 2009
@@ -18,7 +18,7 @@
 
 import java.util.ArrayList;
 
-import org.apache.activemq.queue.Mapper;
+import org.apache.activemq.util.Mapper;
 
 public class PrioritySizeLimiter<E> {
 
@@ -38,7 +38,7 @@
 
     private Mapper<Integer, E> priorityMapper = sizeMapper;
 
-    private class Priority extends AbstractLimiter<E> {
+    private class Priority extends AbstractLimiter<E> implements IFlowSizeLimiter<E> {
         final int priority;
         int size;
         int reserved;
@@ -48,8 +48,15 @@
             this.priority = priority;
         }
 
-        public boolean add(E elem) {
-            int elementSize = sizeMapper.map(elem);
+        public long getSize() {
+            return size;
+        }
+
+        public long getCapacity() {
+            return capacity;
+        }
+
+        public boolean add(int count, long elementSize) {
             totalSize += elementSize;
             size += elementSize;
             if (totalSize >= capacity) {
@@ -64,6 +71,10 @@
             return throttled;
         }
 
+        public boolean add(E elem) {
+            return add(1, sizeMapper.map(elem));
+        }
+
         public boolean canAdd(E elem) {
             if (throttled)
                 return false;
@@ -135,6 +146,18 @@
         public void reserve(E elem) {
             reserved += sizeMapper.map(elem);
         }
+
+        public int getElementSize(E elem) {
+            return sizeMapper.map(elem);
+        }
+    }
+
+    public long getCapacity() {
+        return capacity;
+    }
+    
+    public long getSize() {
+        return totalSize;
     }
 
     public PrioritySizeLimiter(int capacity, int resumeThreshold, int priorities) {
@@ -145,7 +168,7 @@
         }
     }
 
-    public IFlowLimiter<E> getPriorityLimter(int priority) {
+    public IFlowSizeLimiter<E> getPriorityLimter(int priority) {
         return priorities.get(priority);
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java Mon Apr 27 18:40:44 2009
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.flow;
 
-public class SizeLimiter<E> extends AbstractLimiter<E> {
+public class SizeLimiter<E> extends AbstractLimiter<E> implements IFlowSizeLimiter<E> {
 
     protected long capacity;
     protected long resumeThreshold;
@@ -32,8 +32,11 @@
     }
 
     public boolean add(E elem) {
-        this.size += getElementSize(elem);
+        return add(1, getElementSize(elem));
+    }
 
+    public boolean add(int count, long size) {
+        this.size += size;
         if (this.size >= capacity) {
             throttled = true;
         }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Mon Apr 27 18:40:44 2009
@@ -27,7 +27,6 @@
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.protobuf.AsciiBuffer;
 
 /**
  * Base class for a {@link Dispatchable} {@link FlowControllable}
@@ -35,7 +34,7 @@
  * 
  * @param <E>
  */
-public abstract class AbstractFlowQueue<E> extends AbstractLimitedFlowSource<E> implements PersistentQueue<E>, FlowControllable<E>, IFlowQueue<E>, Dispatchable {
+public abstract class AbstractFlowQueue<E> extends AbstractLimitedFlowSource<E> implements FlowControllable<E>, IFlowQueue<E>, Dispatchable {
 
     protected IDispatcher dispatcher;
     protected DispatchContext dispatchContext;
@@ -43,7 +42,6 @@
     private boolean notifyReady = false;
     protected boolean dispatching = false;
     protected int dispatchPriority = 0;
-    protected QueueStoreHelper<E> storeHelper;
     protected FlowQueueListener listener = new FlowQueueListener()
     {
         public void onQueueException(IFlowQueue<?> queue, Throwable thrown) {
@@ -52,8 +50,6 @@
         }
     };
     
-    AsciiBuffer persistentQueueName;
-
     AbstractFlowQueue() {
         super();
     }
@@ -67,28 +63,11 @@
     }
 
     public final void add(E elem, ISourceController<?> source) {
-        checkSave(elem, source);
         getSinkController(elem, source).add(elem, source);
     }
 
     public final boolean offer(E elem, ISourceController<?> source) {
-        if (getSinkController(elem, source).offer(elem, source)) {
-            checkSave(elem, source);
-            return true;
-        }
-        return false;
-    }
-
-    private final void checkSave(E elem, ISourceController<?> source) {
-        //TODO This is currently handled externally to the queue
-        //but it would be nice to move it in here
-        /*if (storeHelper != null && isElementPersistent(elem)) {
-            try {
-                storeHelper.save(elem, true);
-            } catch (IOException e) {
-                listener.onQueueException(this, e);
-            }
-        }*/
+        return getSinkController(elem, source).offer(elem, source);
     }
 
     protected abstract ISinkController<E> getSinkController(E elem, ISourceController<?> source);
@@ -184,53 +163,8 @@
         notifyReady = false;
     }
 
-    /**
-     * Enables persistence for this queue.
-     */
-    public void enablePersistence(QueueStoreHelper<E> storeHelper) {
-        this.storeHelper = storeHelper;
-    }
-
-    /**
-     * Called when an element is added from the queue's store.
-     * 
-     * @param elem
-     *            The element
-     * @param controller
-     *            The store controller.
-     */
-    public void addFromStore(E elem, ISourceController<?> controller) {
-        add(elem, controller);
-    }
-
-    /**
-     * Subclasses should override this if they require persistence requires
-     * saving to the store.
-     * 
-     * @param elem
-     *            The element to check.
-     */
-    public boolean isElementPersistent(E elem) {
-        return false;
-    }
-
     public String toString() {
         return getResourceName();
     }
 
-    /**
-     * Returns the queue name used to indentify the queue in the store
-     * 
-     * @return
-     */
-    public AsciiBuffer getPeristentQueueName() {
-        if (persistentQueueName == null) {
-            String name = getResourceName();
-            if (name != null) {
-                persistentQueueName = new AsciiBuffer(name);
-            }
-        }
-        return persistentQueueName;
-    }
-
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java Mon Apr 27 18:40:44 2009
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.queue;
 
-import org.apache.activemq.dispatch.PriorityLinkedList;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PriorityFlowController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
+import org.apache.activemq.util.PriorityLinkedList;
 import org.apache.kahadb.util.LinkedNode;
 
 /**

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java Mon Apr 27 18:40:44 2009
@@ -16,18 +16,43 @@
  */
 package org.apache.activemq.queue;
 
+import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.IFlowSink;
 
-public interface IQueue<K, V> extends IFlowSink<V>, PersistentQueue<V>{
+public interface IQueue<K, V> extends IFlowSink<V>{
 
+    /**
+     * Called to initialize the queue with values from the queue store.
+     * 
+     * @param sequenceMin
+     *            The lowest sequence number in the store.
+     * @param sequenceMax
+     *            The max sequence number in the store.
+     * @param count
+     *            The number of messages in the queue
+     * @param size
+     *            The size of the messages in the queue
+     */
+    public void initialize(long sequenceMin, long sequenceMax, int count, long size);
+    
+    public QueueStore.QueueDescriptor getDescriptor();
+    
+    public int getEnqueuedCount();
+    
+    public long getEnqueuedSize();
+    
     public void addSubscription(Subscription<V> sub);
 
     public boolean removeSubscription(Subscription<V> sub);
-
-    public boolean removeByValue(V value);
-
-    public boolean removeByKey(K key);
     
-    public void setStore(Store<K, V> store);
-
+    public void setStore(QueueStore<K, V> store);
+    
+    public void setDispatcher(IDispatcher dispatcher);
+    
+    public void start();
+    
+    public void stop();
+    
+    
+ 
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java Mon Apr 27 18:40:44 2009
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.queue;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.IFlowResource;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.ISinkController;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.kahadb.util.LinkedNode;
-import org.apache.kahadb.util.LinkedNodeList;
-
-/**
- */
-public class LoadBalancedFlowQueue<E> extends AbstractFlowQueue<E> {
-    private final LinkedList<E> queue = new LinkedList<E>();
-    private final LinkedNodeList<SinkNode> readyConsumers = new LinkedNodeList<SinkNode>();
-    private final HashMap<IFlowSink<E>, SinkNode> consumers = new HashMap<IFlowSink<E>, SinkNode>();
-
-    private boolean strcitDispatch = true;
-
-    private final FlowController<E> sinkController;
-
-    private final ISourceController<E> sourceControler = new ISourceController<E>() {
-
-        public Flow getFlow() {
-            return sinkController.getFlow();
-        }
-
-        public IFlowResource getFlowResource() {
-            return LoadBalancedFlowQueue.this;
-        }
-
-        public void onFlowBlock(ISinkController<?> sink) {
-            synchronized (LoadBalancedFlowQueue.this) {
-                SinkNode node = consumers.get(sink);
-                if (node != null) {
-                    node.unlink();
-                }
-                // controller.onFlowBlock(sink);
-            }
-
-        }
-
-        public void onFlowResume(ISinkController<?> sink) {
-            synchronized (LoadBalancedFlowQueue.this) {
-                SinkNode node = consumers.get(sink);
-                if (node != null) {
-                    // controller.onFlowResume(sink);
-                    // Add to ready list if not there:
-                    if (!node.isLinked()) {
-                        boolean notify = false;
-                        if (readyConsumers.isEmpty()) {
-                            notify = true;
-                        }
-
-                        readyConsumers.addLast(node);
-                        if (notify && !queue.isEmpty()) {
-                            notifyReady();
-                        }
-                    }
-                }
-            }
-        }
-
-        public void elementDispatched(E elem) {
-            // TODO Auto-generated method stub
-
-        }
-
-        public boolean isSourceBlocked() {
-            // TODO Auto-generated method stub
-            return false;
-        }
-
-    };
-
-    /**
-     * Creates a flow queue that can handle multiple flows.
-     * 
-     * @param flow
-     *            The {@link Flow}
-     * @param controller
-     *            The FlowController if this queue is flow controlled:
-     */
-    public LoadBalancedFlowQueue(Flow flow, String name, long resourceId, IFlowLimiter<E> limiter) {
-        super(name);
-        this.sinkController = new FlowController<E>(getFlowControllableHook(), flow, limiter, this);
-        super.onFlowOpened(sinkController);
-    }
-
-    protected final ISinkController<E> getSinkController(E elem, ISourceController<?> source) {
-        return sinkController;
-    }
-    
-    /**
-     * Called when the controller accepts a message for this queue.
-     */
-    public synchronized void flowElemAccepted(ISourceController<E> controller, E elem) {
-        queue.add(elem);
-        if (!readyConsumers.isEmpty()) {
-            notifyReady();
-        }
-    }
-
-    public FlowController<E> getFlowController(Flow flow) {
-        return sinkController;
-    }
-
-    public boolean isDispatchReady() {
-        return !queue.isEmpty() && !readyConsumers.isEmpty();
-    }
-
-    public boolean pollingDispatch() {
-        if (strcitDispatch) {
-            return strictPollingDispatch();
-        } else {
-            return loosePollingDispatch();
-        }
-    }
-
-    private boolean strictPollingDispatch() {
-
-        SinkNode node = null;
-        E elem = null;
-        synchronized (this) {
-            if (readyConsumers.isEmpty()) {
-                return false;
-            }
-            // Get the next elem:
-            elem = queue.peek();
-            if (elem == null) {
-                return false;
-            }
-
-            node = readyConsumers.getHead();
-        }
-
-        while (true) {
-
-            boolean accepted = node.sink.offer(elem, sourceControler);
-
-            synchronized (this) {
-                if (accepted) {
-                    queue.remove();
-                    if (autoRelease) {
-                        sinkController.elementDispatched(elem);
-                    }
-                    if (!readyConsumers.isEmpty()) {
-                        readyConsumers.rotate();
-                    }
-                    return true;
-                } else {
-                    if (readyConsumers.isEmpty()) {
-                        return false;
-                    }
-                    node = readyConsumers.getHead();
-                }
-            }
-        }
-    }
-
-    private boolean loosePollingDispatch() {
-        E elem = null;
-        IFlowSink<E> sink = null;
-        synchronized (this) {
-            if (readyConsumers.isEmpty()) {
-                return false;
-            }
-
-            // Get the next sink:
-            sink = readyConsumers.getHead().sink;
-
-            // Get the next elem:
-            elem = poll();
-
-            readyConsumers.rotate();
-        }
-
-        sink.add(elem, sourceControler);
-        return true;
-    }
-
-    public final E poll() {
-        synchronized (this) {
-            E elem = queue.poll();
-            // FIXME the release should really be done after dispatch.
-            // doing it here saves us from having to resynchronize
-            // after dispatch, but release limiter space too soon.
-            if (elem != null) {
-                if (autoRelease) {
-                    sinkController.elementDispatched(elem);
-                }
-                return elem;
-            }
-            return null;
-        }
-    }
-
-    public final void addSink(IFlowSink<E> sink) {
-        synchronized (this) {
-            SinkNode node = consumers.get(sink);
-            if (node == null) {
-                node = new SinkNode(sink);
-                consumers.put(sink, node);
-                readyConsumers.addLast(node);
-                if (!queue.isEmpty()) {
-                    notifyReady();
-                }
-            }
-        }
-    }
-
-    private class SinkNode extends LinkedNode<SinkNode> {
-        public final IFlowSink<E> sink;
-
-        public SinkNode(IFlowSink<E> sink) {
-            this.sink = sink;
-        }
-
-        @Override
-        public String toString() {
-            return sink.toString();
-        }
-    }
-
-    public boolean isStrcitDispatch() {
-        return strcitDispatch;
-    }
-
-    public void setStrcitDispatch(boolean strcitDispatch) {
-        this.strcitDispatch = strcitDispatch;
-    }
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java Mon Apr 27 18:40:44 2009
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.queue;
-
-public interface Mapper<K, V> {
-    K map(V element);
-}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java Mon Apr 27 18:40:44 2009
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.queue;
-
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.kahadb.util.LinkedNode;
-import org.apache.kahadb.util.LinkedNodeList;
-
-public class MemoryStore<K, V> implements Store<K, V> {
-
-    AtomicLong counter = new AtomicLong();
-
-    class MemoryStoreNode extends LinkedNode<MemoryStoreNode> implements StoreNode<K, V> {
-        private Subscription<V> owner;
-        private final K key;
-        private final V value;
-        private long id = counter.getAndIncrement();
-
-        public MemoryStoreNode(K key, V value) {
-            this.key = key;
-            this.value = value;
-        }
-
-        public boolean acquire(Subscription<V> owner) {
-            if (this.owner == null) {
-                this.owner = owner;
-            }
-            return true;
-        }
-
-        public K getKey() {
-            return key;
-        }
-
-        public V getValue() {
-            return value;
-        }
-
-        @Override
-        public String toString() {
-            return "node:" + id;
-        }
-
-        public void unacquire() {
-            this.owner = null;
-        }
-
-    }
-
-    class MemoryStoreCursor implements StoreCursor<K, V> {
-        private MemoryStoreNode last;
-        private MemoryStoreNode next;
-
-        public MemoryStoreCursor() {
-        }
-
-        public MemoryStoreCursor(MemoryStoreNode next) {
-            this.next = next;
-        }
-
-        public void setNext(StoreNode<K, V> next) {
-            this.next = (MemoryStoreNode) next;
-        }
-
-        public boolean hasNext() {
-            if (next != null)
-                return true;
-
-            if (last == null || last.getNextCircular() == last) {
-                next = (MemoryStoreNode) elements.getHead();
-                return next != null;
-            }
-
-            while (true) {
-                MemoryStoreNode t = last.getNextCircular();
-                if (t.id > last.id) {
-                    next = t;
-                    return true;
-                } else {
-                    return false;
-                }
-            }
-        }
-
-        public StoreNode<K, V> peekNext() {
-            hasNext();
-            return next;
-        }
-
-        public StoreNode<K, V> next() {
-            try {
-                hasNext();
-                return next;
-            } finally {
-                last = next;
-                next = null;
-            }
-        }
-
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-    }
-
-    protected HashMap<K, MemoryStoreNode> map = new HashMap<K, MemoryStoreNode>();
-    protected LinkedNodeList<MemoryStoreNode> elements = new LinkedNodeList<MemoryStoreNode>();
-
-    public StoreNode<K, V> add(K key, V value) {
-        MemoryStoreNode rc = new MemoryStoreNode(key, value);
-        map.put(key, rc);
-        elements.addLast(rc);
-        return rc;
-    }
-
-    public StoreNode<K, V> remove(K key) {
-        MemoryStoreNode node = (MemoryStoreNode) map.remove(key);
-        if (node != null) {
-            node.unlink();
-        }
-        return node;
-    }
-
-    public boolean isEmpty() {
-        return elements.isEmpty();
-    }
-
-    public org.apache.activemq.queue.Store.StoreCursor<K, V> openCursor() {
-        MemoryStoreCursor cursor = new MemoryStoreCursor();
-        return cursor;
-    }
-
-    public org.apache.activemq.queue.Store.StoreCursor<K, V> openCursorAt(org.apache.activemq.queue.Store.StoreNode<K, V> next) {
-        MemoryStoreCursor cursor = new MemoryStoreCursor((MemoryStoreNode) next);
-        return cursor;
-    }
-
-    public int size() {
-        return map.size();
-    }
-
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Mon Apr 27 18:40:44 2009
@@ -20,37 +20,119 @@
 import java.util.HashMap;
 import java.util.HashSet;
 
+import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
+import org.apache.activemq.util.Mapper;
 
-abstract public class PartitionedQueue<P, K, V> extends AbstractLimitedFlowResource<V> implements IQueue<K, V> {
+abstract public class PartitionedQueue<K, V> extends AbstractLimitedFlowResource<V> implements IPartitionedQueue<K, V> {
 
     private HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
-    private HashMap<P, IQueue<K, V>> partitions = new HashMap<P, IQueue<K, V>>();
-    private Mapper<P, V> partitionMapper;
-    private Store<K, V> store;
-    private AsciiBuffer queueName;
-
-    public IQueue<K, V> getPartition(P partitionKey) {
+    private HashMap<Integer, IQueue<K, V>> partitions = new HashMap<Integer, IQueue<K, V>>();
+    protected Mapper<Integer, V> partitionMapper;
+    private QueueStore<K, V> store;
+    protected IDispatcher dispatcher;
+    private boolean started;
+    protected QueueStore.QueueDescriptor queueDescriptor;
+
+    public PartitionedQueue(String name) {
+        super(name);
+        queueDescriptor = new QueueStore.QueueDescriptor();
+        queueDescriptor.setQueueName(new AsciiBuffer(getResourceName()));
+        queueDescriptor.setQueueType(QueueDescriptor.PARTITIONED);
+    }
+
+    public QueueStore.QueueDescriptor getDescriptor() {
+        return queueDescriptor;
+    }
+
+    public IQueue<K, V> getPartition(int partitionKey) {
+        boolean save = false;
+        IQueue<K, V> rc = null;
         synchronized (partitions) {
-            IQueue<K, V> rc = partitions.get(partitionKey);
+            rc = partitions.get(partitionKey);
             if (rc == null) {
-                rc = cratePartition(partitionKey);
+                rc = createPartition(partitionKey);
                 partitions.put(partitionKey, rc);
                 for (Subscription<V> sub : subscriptions) {
                     rc.addSubscription(sub);
                 }
             }
-            return rc;
+        }
+        if (save) {
+            store.addQueue(rc.getDescriptor());
+        }
+
+        return rc;
+    }
+
+    public int getEnqueuedCount() {
+        synchronized (partitions) {
+            int count = 0;
+            for (IQueue<K, V> queue : partitions.values()) {
+                if (queue != null) {
+                    count += queue.getEnqueuedCount();
+                }
+            }
+            return count;
+        }
+    }
+
+    public synchronized long getEnqueuedSize() {
+        synchronized (partitions) {
+            long size = 0;
+            for (IQueue<K, V> queue : partitions.values()) {
+                if (queue != null) {
+                    size += queue.getEnqueuedSize();
+                }
+            }
+            return size;
         }
     }
 
-    public void setStore(Store<K, V> store) {
+    public void setStore(QueueStore<K, V> store) {
         this.store = store;
     }
 
-    abstract protected IQueue<K, V> cratePartition(P partitionKey);
+    abstract public IQueue<K, V> createPartition(int partitionKey);
+
+    public void addPartition(int partitionKey, IQueue<K, V> queue) {
+        synchronized (partitions) {
+            partitions.put(partitionKey, queue);
+            for (Subscription<V> sub : subscriptions) {
+                queue.addSubscription(sub);
+            }
+        }
+    }
+
+    public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
+        // No-op, only partitions should have stored values.
+        if (count > 0 || size > 0) {
+            throw new IllegalArgumentException("Partioned queues do not themselves hold values");
+        }
+    }
+
+    public synchronized void start() {
+        if (!started) {
+            started = true;
+            for (IQueue<K, V> partition : partitions.values()) {
+                if (partition != null)
+                    partition.start();
+            }
+        }
+    }
+
+    public synchronized void stop() {
+        if (started) {
+            started = false;
+            for (IQueue<K, V> partition : partitions.values()) {
+                if (partition != null)
+                    partition.stop();
+            }
+        }
+    }
 
     public void addSubscription(Subscription<V> sub) {
         synchronized (partitions) {
@@ -75,53 +157,33 @@
         return false;
     }
 
-    public boolean removeByKey(K key) {
-        synchronized (partitions) {
-            Collection<IQueue<K, V>> values = partitions.values();
-            for (IQueue<K, V> queue : values) {
-                if (queue.removeByKey(key)) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
-    public boolean removeByValue(V value) {
-        P partitionKey = partitionMapper.map(value);
-        IQueue<K, V> partition = getPartition(partitionKey);
-        return partition.removeByValue(value);
-    }
-
-    public void setPartitionMapper(Mapper<P, V> partitionMapper) {
+    public void setPartitionMapper(Mapper<Integer, V> partitionMapper) {
         this.partitionMapper = partitionMapper;
     }
 
-    public Mapper<P, V> getPartitionMapper() {
+    public Mapper<Integer, V> getPartitionMapper() {
         return partitionMapper;
     }
 
     public void add(V value, ISourceController<?> source) {
-        P partitionKey = partitionMapper.map(value);
+        int partitionKey = partitionMapper.map(value);
         IQueue<K, V> partition = getPartition(partitionKey);
         partition.add(value, source);
     }
 
     public boolean offer(V value, ISourceController<?> source) {
-        P partitionKey = partitionMapper.map(value);
+        int partitionKey = partitionMapper.map(value);
         IQueue<K, V> partition = getPartition(partitionKey);
         return partition.offer(value, source);
     }
 
-    public void addFromStore(V elem, ISourceController<?> controller) {
-        throw new UnsupportedOperationException();
-
-    }
-
-    public AsciiBuffer getPeristentQueueName() {
-        if (queueName == null) {
-            queueName = new AsciiBuffer(getResourceName());
+    public void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+        synchronized (partitions) {
+            Collection<IQueue<K, V>> values = partitions.values();
+            for (IQueue<K, V> queue : values) {
+                queue.setDispatcher(dispatcher);
+            }
         }
-        return queueName;
     }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java Mon Apr 27 18:40:44 2009
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.queue;
-
-import java.io.IOException;
-
-public interface QueueStoreHelper<E> {
-
-    /**
-     * Requests that the helper start loading elements
-     * saved for this queue. 
-     * @param queue
-     */
-    public void startLoadingQueue();
-    
-    /**
-     * Stop the helper from loading more elements stored for 
-     * the queue
-     * @param queue The queue.
-     */
-    public void stopLoadingQueue();
-    
-    /**
-     * Checks to see if there are elements in the store 
-     * for this queue. 
-     * @param queue
-     */
-    public boolean hasStoredElements();
-    
-    /**
-     * Deletes a given element for this queue from the store. 
-     * @param elem The elem to delete. 
-     */
-    public void delete(E elem, boolean flush);
-    
-    /**
-     * Saves an element to the store. 
-     * @param elem The element to be saved. 
-     * @throws IOException 
-     */
-    public void save(E elem, boolean flush) throws IOException;
-    
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Mon Apr 27 18:40:44 2009
@@ -24,8 +24,10 @@
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
+import org.apache.activemq.util.Mapper;
 
-public class SharedPriorityQueue<K, V> extends AbstractLimitedFlowResource<V> implements IQueue<K, V> {
+public class SharedPriorityQueue<K, V> extends AbstractLimitedFlowResource<V> implements IPartitionedQueue<K, V> {
 
     private final HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
     private final Mapper<Integer, V> priorityMapper;
@@ -34,10 +36,15 @@
     private boolean autoRelease;
     private IDispatcher dispatcher;
     private final PrioritySizeLimiter<V> limiter;
-    private Store<K, V> store;
+    private QueueStore<K, V> store;
+    private boolean started;
+    private QueueStore.QueueDescriptor queueDescriptor;
 
     public SharedPriorityQueue(String name, PrioritySizeLimiter<V> limiter) {
         super(name);
+        queueDescriptor = new QueueStore.QueueDescriptor();
+        queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
+        queueDescriptor.setQueueType(QueueDescriptor.SHARED_PRIORITY);
         this.limiter = limiter;
         priorityMapper = limiter.getPriorityMapper();
         for (int i = 0; i < limiter.getPriorities(); i++) {
@@ -45,7 +52,48 @@
         }
     }
 
-    public void setStore(Store<K, V> store) {
+    public synchronized void start() {
+        if (!started) {
+            started = true;
+            for (SharedQueue<K, V> partition : partitions) {
+                if (partition != null)
+                    partition.start();
+            }
+        }
+    }
+
+    public synchronized void stop() {
+        if (started) {
+            started = false;
+            for (SharedQueue<K, V> partition : partitions) {
+                if (partition != null)
+                    partition.stop();
+            }
+        }
+    }
+
+    public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
+        // No-op, only partitions should have stored values.
+        if (count > 0 || size > 0) {
+            throw new IllegalArgumentException("Partioned queues do not themselves hold values");
+        }
+    }
+
+    public synchronized int getEnqueuedCount() {
+        int count = 0;
+        for (SharedQueue<K, V> queue : partitions) {
+            if (queue != null) {
+                count += queue.getEnqueuedCount();
+            }
+        }
+        return count;
+    }
+
+    public synchronized long getEnqueuedSize() {
+        return limiter.getSize();
+    }
+
+    public void setStore(QueueStore<K, V> store) {
         this.store = store;
     }
 
@@ -78,35 +126,31 @@
         return false;
     }
 
-    public boolean removeByKey(K key) {
-        synchronized (this) {
-            for (SharedQueue<K, V> queue : partitions) {
-                if (queue.removeByKey(key)) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
-    public boolean removeByValue(V value) {
-        int prio = priorityMapper.map(value);
-        IQueue<K, V> partition = getPartition(prio);
-        return partition.removeByValue(value);
+    public IQueue<K, V> createPartition(int prio) {
+        return getPartition(prio, false);
     }
 
-    private IQueue<K, V> getPartition(int prio) {
+    private IQueue<K, V> getPartition(int prio, boolean initialize) {
         synchronized (this) {
             SharedQueue<K, V> queue = partitions.get(prio);
             if (queue == null) {
-                queue = new SharedQueue<K, V>(getResourceName() + ":" + prio, limiter.getPriorityLimter(prio), this);
+                queue = new SharedQueue<K, V>(getResourceName() + "$" + prio, limiter.getPriorityLimter(prio), this);
                 queue.setAutoRelease(autoRelease);
                 queue.setDispatcher(dispatcher);
                 queue.setDispatchPriority(prio);
                 queue.setKeyMapper(keyMapper);
                 queue.setStore(store);
+                queue.getDescriptor().setParent(queueDescriptor.getQueueName());
+                queue.getDescriptor().setPartitionId(prio);
                 partitions.set(prio, queue);
                 onFlowOpened(queue.getFlowControler());
+                if (initialize) {
+                    store.addQueue(queue.getDescriptor());
+                    queue.initialize(0, 0, 0, 0);
+                }
+                if (started) {
+                    queue.start();
+                }
 
                 for (Subscription<V> sub : subscriptions) {
                     queue.addSubscription(sub);
@@ -117,15 +161,19 @@
         }
     }
 
+    public QueueStore.QueueDescriptor getDescriptor() {
+        return queueDescriptor;
+    }
+
     public void add(V value, ISourceController<?> source) {
         int prio = priorityMapper.map(value);
-        IQueue<K, V> partition = getPartition(prio);
+        IQueue<K, V> partition = getPartition(prio, true);
         partition.add(value, source);
     }
 
     public boolean offer(V value, ISourceController<?> source) {
         int prio = priorityMapper.map(value);
-        IQueue<K, V> partition = getPartition(prio);
+        IQueue<K, V> partition = getPartition(prio, true);
         return partition.offer(value, source);
     }
 
@@ -139,19 +187,6 @@
 
     public void setDispatcher(IDispatcher dispatcher) {
         this.dispatcher = dispatcher;
-    }
-
-    public void addFromStore(V elem, ISourceController<?> controller) {
-        // TODO Auto-generated method stub
-
-    }
-
-    public AsciiBuffer getPeristentQueueName() {
-        // TODO Auto-generated method stub
-        return new AsciiBuffer(this.getResourceName());
-    }
-
-    public boolean isElementPersistent(V elem) {
-        return false;
+        super.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
     }
 }



Mime
View raw message