activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r769099 [4/5] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...
Date Mon, 27 Apr 2009 18:40:49 GMT
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Mon Apr 27 18:40:44 2009
@@ -16,401 +16,1085 @@
  */
 package org.apache.activemq.queue;
 
-import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowResource;
-import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSizeLimiter;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.Store.StoreCursor;
-import org.apache.activemq.queue.Store.StoreNode;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
+import org.apache.activemq.queue.QueueStore.RestoreListener;
+import org.apache.activemq.queue.QueueStore.RestoredElement;
+import org.apache.activemq.util.Mapper;
+import org.apache.activemq.util.SortedLinkedList;
+import org.apache.activemq.util.SortedLinkedListNode;
 import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
 
 /**
- * IQueue which does direct dispatch whenever it can.
+ * A SharedMessageQueue.
+ * 
+ * @author cmacnaug
+ * 
+ * @param <E>
  */
 public class SharedQueue<K, V> extends AbstractFlowQueue<V> implements IQueue<K, V> {
 
-    protected Store<K, V> store = new TreeMemoryStore<K, V>();
+    private static final boolean DEBUG = false;
 
-    private final LinkedNodeList<SubscriptionNode> unreadyDirectSubs = new LinkedNodeList<SubscriptionNode>();
-    private final LinkedNodeList<SubscriptionNode> readyDirectSubs = new LinkedNodeList<SubscriptionNode>();
-
-    private final LinkedNodeList<SubscriptionNode> unreadyPollingSubs = new LinkedNodeList<SubscriptionNode>();
-    private final LinkedNodeList<SubscriptionNode> readyPollingSubs = new LinkedNodeList<SubscriptionNode>();
+    private final Object mutex;
 
-    private final HashMap<Subscription<V>, SubscriptionNode> subscriptions = new HashMap<Subscription<V>, SubscriptionNode>();
-    private final HashMap<IFlowResource, SubscriptionNode> sinks = new HashMap<IFlowResource, SubscriptionNode>();
+    private final Flow flow;
+    private final QueueStore.QueueDescriptor queueDescriptor;
+    // For now each queue element is assigned a restoreBlock number
+    // which is used for tracking page in requests. A trailing
+    // consumer will request messages from at most one restoreBlock
+    // at a time from the database.
+    private static final int RESTORE_BLOCK_SIZE = 50;
+
+    private static final int ACCEPTED = 0;
+    private static final int NO_MATCH = 1;
+    private static final int DECLINED = 2;
+
+    private final SortedLinkedList<QueueElement> queue = new SortedLinkedList<QueueElement>();
+    private Mapper<K, V> keyMapper;
+
+    private final ElementLoader loader;
+    private final Cursor liveCursor;
+    private QueueStore<K, V> store;
+    private long nextSequenceNumber = 0;
+
+    // Open consumers:
+    private final HashMap<Subscription<V>, SubscriptionContext> consumers = new HashMap<Subscription<V>, SubscriptionContext>();
+
+    // Consumers that are operating against the live cursor:
+    private final LinkedNodeList<SubscriptionContext> liveConsumers = new LinkedNodeList<SubscriptionContext>();
+
+    // Browsing subscriptions:
+    private final LinkedNodeList<SubscriptionContext> liveBrowsers = new LinkedNodeList<SubscriptionContext>();
+
+    // Consumers that are behind the live cursor
+    private final LinkedNodeList<SubscriptionContext> trailingConsumers = new LinkedNodeList<SubscriptionContext>();
+
+    // Consumers that are waiting for elements to be paged in:
+    private final LinkedNodeList<SubscriptionContext> restoringConsumers = new LinkedNodeList<SubscriptionContext>();
+
+    // Limiter/Controller for the size of the queue:
+    private final FlowController<V> sizeController;
+    private final IFlowSizeLimiter<V> sizeLimiter;
+
+    // Memory Limiter and controller operate against the liveCursor.
+    private static final long DEFAULT_MEMORY_LIMIT = 1536;
+    private final IFlowSizeLimiter<QueueElement> memoryLimiter;
+    private final FlowController<QueueElement> memoryController;
+    private boolean useMemoryLimiter;
 
-    private final FlowController<V> sinkController;
-    private final Object mutex;
+    private int totalQueueCount;
 
-    protected Mapper<K, V> keyMapper;
-    private long directs;
+    private boolean initialized = false;
+    private boolean started = false;
 
-    private final ISourceController<V> sourceControler = new ISourceController<V>() {
+    public SharedQueue(String name, IFlowSizeLimiter<V> limiter) {
+        this(name, limiter, null);
+    }
 
-        public Flow getFlow() {
-            return sinkController.getFlow();
-        }
+    SharedQueue(String name, IFlowSizeLimiter<V> limiter, Object mutex) {
+        super(name);
+        liveCursor = new Cursor(name);
+        this.mutex = mutex == null ? new Object() : mutex;
 
-        public void elementDispatched(V elem) {
+        flow = new Flow(getResourceName(), false);
+        queueDescriptor = new QueueStore.QueueDescriptor();
+        queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
+        queueDescriptor.setQueueType(QueueDescriptor.SHARED);
+        this.sizeLimiter = limiter;
+
+        this.sizeController = new FlowController<V>(getFlowControllableHook(), flow, limiter, this.mutex);
+        sizeController.useOverFlowQueue(false);
+        super.onFlowOpened(sizeController);
+
+        if (DEFAULT_MEMORY_LIMIT < limiter.getCapacity()) {
+            memoryLimiter = new SizeLimiter<QueueElement>(DEFAULT_MEMORY_LIMIT, DEFAULT_MEMORY_LIMIT) {
+                public int getElementSize(QueueElement qe) {
+                    return qe.size;
+                };
+            };
+
+            memoryController = new FlowController<QueueElement>(null, flow, memoryLimiter, mutex) {
+                @Override
+                public IFlowResource getFlowResource() {
+                    return SharedQueue.this;
+                }
+            };
+            useMemoryLimiter = true;
+        } else {
+            useMemoryLimiter = false;
+            memoryLimiter = null;
+            memoryController = null;
         }
 
-        public void onFlowBlock(ISinkController<?> sink) {
-        }
+        loader = new ElementLoader();
 
-        public void onFlowResume(ISinkController<?> sinkController) {
-            IFlowResource sink = sinkController.getFlowResource();
-            synchronized (mutex) {
-                SubscriptionNode node = sinks.get(sink);
-                if (node != null) {
-                    node.unlink();
-                    boolean notify = false;
-                    if (node.cursor == null) {
-                        readyDirectSubs.addLast(node);
-                        // System.out.println("Subscription state change: un-ready direct -> ready direct: "+node);
-                    } else {
-                        if (readyPollingSubs.isEmpty()) {
-                            notify = !store.isEmpty();
-                        }
-                        readyPollingSubs.addLast(node);
-                        // System.out.println("Subscription state change: un-ready polling -> ready polling: "+node);
-                    }
+    }
 
-                    if (notify) {
-                        notifyReady();
-                    }
+    /**
+     * Called to initialize the queue with values from the message store.
+     * 
+     * @param sequenceMin
+     *            The lowest sequence number in the store.
+     * @param sequenceMax
+     *            The max sequence number in the store.
+     * @param count
+     *            The number of messages in the queue
+     * @param size
+     *            The size of the messages in the queue
+     */
+    public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
+        synchronized (mutex) {
+            if (initialized) {
+                throw new IllegalStateException("Already initialized");
+            } else {
+                // Initialize counts:
+                nextSequenceNumber = sequenceMax + 1;
+                if (count > 0) {
+                    sizeLimiter.add(count, size);
+                    totalQueueCount = count;
+                    // Add a paged out placeholder:
+                    QueueElement qe = new QueueElement(null, sequenceMin);
+                    qe.pagedOutCount = count;
+                    qe.pagedOutSize = size;
+                    queue.add(qe);
                 }
+
+                initialized = true;
+                liveCursor.reset(sequenceMin);
+
+                if (DEBUG)
+                    System.out.println(this + "Initialized, first seq: " + sequenceMin + " next sequence: " + nextSequenceNumber);
             }
         }
+    }
 
-        @Override
-        public String toString() {
-            return getResourceName();
-        }
+    public QueueStore.QueueDescriptor getDescriptor() {
+        return queueDescriptor;
+    }
 
-        public boolean isSourceBlocked() {
-            throw new UnsupportedOperationException();
+    public int getEnqueuedCount() {
+        synchronized (mutex) {
+            return totalQueueCount;
         }
+    }
 
-        public IFlowResource getFlowResource() {
-            return SharedQueue.this;
+    public long getEnqueuedSize() {
+        synchronized (mutex) {
+            return sizeLimiter.getSize();
         }
+    }
 
-    };
+    /**
+     * Starts this queue.
+     */
+    public void start() {
+        synchronized (mutex) {
+            if (!initialized) {
+                throw new IllegalStateException("Not able to start uninitialized queue: " + getResourceName());
+            }
 
-    public SharedQueue(String name, IFlowLimiter<V> limiter) {
-        this(name, limiter, new Object());
-        autoRelease = true;
+            if (!started) {
+                started = true;
+                liveCursor.getNext();
+                if (isDispatchReady()) {
+                    notifyReady();
+                }
+            }
+        }
     }
 
     /**
-     * Creates a flow queue that can handle multiple flows.
-     * 
-     * @param flow
-     *            The {@link Flow}
-     * @param controller
-     *            The FlowController if this queue is flow controlled:
+     * Stops this queue.
      */
-    public SharedQueue(String name, IFlowLimiter<V> limiter, Object mutex) {
-        super(name);
-        this.mutex = mutex;
-        Flow flow = new Flow(name, false);
-        this.sinkController = new FlowController<V>(getFlowControllableHook(), flow, limiter, mutex);
-        super.onFlowOpened(sinkController);
+    public void stop() {
+        synchronized (mutex) {
+            started = false;
+        }
     }
 
-    public void setStore(Store<K, V> store) {
-        this.store = store;
+    public void shutdown() {
+        stop();
     }
 
-    protected final ISinkController<V> getSinkController(V elem, ISourceController<?> source) {
-        return sinkController;
-    }
+    public void flowElemAccepted(ISourceController<V> source, V elem) {
 
-    /**
-     * Called when the controller accepts a message for this queue.
-     */
-    public void flowElemAccepted(ISourceController<V> controller, V value) {
         synchronized (mutex) {
 
-            // Try to directly dispatch to one of the attached subscriptions
-            // sourceDispatch returns null on successful dispatch
-            ArrayList<SubscriptionNode> matches = directDispatch(value);
-            if (matches != null) {
-
-                if (directs != 0) {
-                    // System.out.println("could not directly dispatch.. had directly dispatched: "+directs);
-                    directs = 0;
-                }
-
-                K key = keyMapper.map(value);
-                StoreNode<K, V> node = store.add(key, value);
-
-                int matchCount = 0;
-                // Go through the un-ready direct subs and find out if any those
-                // would
-                // have matched the message, and if so then set it up to cursor
-                // from
-                // it.
-                SubscriptionNode sub = unreadyDirectSubs.getHead();
-                while (sub != null) {
-                    SubscriptionNode next = sub.getNext();
-                    if (sub.subscription.matches(value)) {
-                        sub.unlink();
-                        sub.resumeAt(node);
-                        unreadyPollingSubs.addLast(sub);
-                        matchCount++;
-                        // System.out.println("Subscription state change: un-ready direct -> un-ready polling: "+sub);
-                    }
-                    sub = next;
+            if (!initialized) {
+                throw new IllegalStateException("Not able to use uninitialized queue: " + getResourceName());
+            }
+
+            // Create a new queue element with the next sequence number:
+            QueueElement qe = new QueueElement(elem, nextSequenceNumber++);
+
+            // Save the element (note that it is important this be done after
+            // we've set the sequence number above)
+            if (!store.isFromStore(elem) && store.isElemPersistent(elem)) {
+                try {
+                    // TODO Revisit delayability criteria (basically,
+                    // opened, unblocked receivers)
+                    store.persistQueueElement(queueDescriptor, source, elem, qe.sequence, true);
+
+                } catch (Exception e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
                 }
+            }
+
+            //Add it to our queue:
+            queue.add(qe);
+            totalQueueCount++;
+            
+            // Check with the loader to see if it needs to be paged out:
+            loader.elementAdded(qe, source);
+
+            // Request dispatch for the newly enqueued element.
+            // TODO consider optimizing to do direct dispatch?
+            // It might be better if the dispatcher itself provided
+            // this for cases where the caller is on the same dispatcher
+            if (isDispatchReady()) {
+                notifyReady();
+                // pollingDispatch();
+            }
+        }
+    }
+
+    public boolean pollingDispatch() {
 
-                // Also do it for all the ready nodes that matched... but which
-                // we
-                // could not enqueue to.
-                for (SubscriptionNode subNode : matches) {
-                    subNode.unlink();
-                    subNode.resumeAt(node);
-                    unreadyPollingSubs.addLast(subNode);
-                    // System.out.println("Subscription state change: ready direct -> un-ready polling: "+subNode);
+        synchronized (mutex) {
+            loader.processPageInRequests();
+
+            // Dispatch ready consumers:
+            SubscriptionContext consumer = trailingConsumers.getHead();
+            while (consumer != null) {
+                SubscriptionContext next = consumer.getNext();
+                consumer.trailingDispatch();
+                if (next != null) {
+                    consumer = next;
+                } else {
+                    consumer = trailingConsumers.getHead();
                 }
-                matchCount += matches.size();
+            }
 
-                if (matchCount > 0) {
-                    // We have interested subscriptions for the message.. but
-                    // they are not ready to receive.
-                    // Would be cool if we could flow control the source.
+            // Service any browsers:
+            /*
+             * SubscriptionContext browser = liveBrowsers.getHead(); while
+             * (browser != null) { SubscriptionContext nextBrowser =
+             * browser.getNext(); browser.trailingDispatch(); if (nextBrowser !=
+             * null) { browser = nextBrowser; } else { break; } }
+             */
+
+            // Process live consumers:
+            QueueElement next = liveCursor.getNext();
+            if (next != null && !next.isPagedOut()) {
+
+                // See if there are any interested consumers:
+                consumer = liveConsumers.getHead();
+                boolean interested = false;
+
+                find_consumer: while (consumer != null) {
+
+                    SubscriptionContext nextConsumer = consumer.getNext();
+                    switch (consumer.offer(next, liveCursor)) {
+                    case ACCEPTED:
+                        // Rotate list so this one is last next time:
+                        liveConsumers.rotate();
+                        interested = true;
+                        break find_consumer;
+                    case DECLINED:
+                        interested = true;
+                        break;
+                    case NO_MATCH:
+                        // Move on to the next consumer if this one didn't match
+                        consumer = consumer.getNext();
+                    }
+
+                    consumer = nextConsumer;
                 }
 
-                if (!readyPollingSubs.isEmpty()) {
-                    notifyReady();
+                // Advance the live cursor if this element was acquired
+                // or there was no interest:
+                if (!interested) {
+                    liveCursor.skip(next);
                 }
-            } else {
-                directs++;
+
+                // Request page in if the next element is paged out:
+                next = liveCursor.getNext();
             }
+            return isDispatchReady();
         }
-    }
 
-    public FlowController<V> getFlowController(Flow flow) {
-        return sinkController;
     }
 
     public boolean isDispatchReady() {
-        return !store.isEmpty() && !readyPollingSubs.isEmpty();
+        if (!initialized) {
+            return false;
+        }
+
+        if (started) {
+            // If we have live consumers, and an element ready for dispatch
+            if (!liveConsumers.isEmpty() && liveCursor.isReady()) {
+                return true;
+            }
+
+            // If there are ready trailing consumers:
+            if (!trailingConsumers.isEmpty()) {
+                return true;
+            }
+
+            // Might consider allowing browsers to browse
+            // while stopped:
+            if (!liveBrowsers.isEmpty()) {
+                return true;
+            }
+        }
+
+        // If there are restored messages ready for enqueue:
+        if (loader.hasRestoredMessages()) {
+            return true;
+        }
+
+        return false;
     }
 
-    private ArrayList<SubscriptionNode> directDispatch(V elem) {
-        ArrayList<SubscriptionNode> matches = new ArrayList<SubscriptionNode>(readyDirectSubs.size());
-        boolean accepted = false;
-        SubscriptionNode next = null;
-        SubscriptionNode node = readyDirectSubs.getHead();
-        while (node != null) {
-            next = node.getNext();
-            if (node.subscription.matches(elem)) {
-                accepted = node.subscription.getSink().offer(elem, sourceControler);
-                if (accepted) {
-                    if (autoRelease) {
-                        sinkController.elementDispatched(elem);
-                    }
-                    break;
-                } else {
-                    matches.add(node);
-                }
+    public void addSubscription(Subscription<V> subscription) {
+        synchronized (mutex) {
+            SubscriptionContext context = new SubscriptionContext(subscription);
+            SubscriptionContext old = consumers.put(subscription, context);
+            if (old != null) {
+                consumers.put(subscription, old);
+            } else {
+                context.start();
             }
-            node = next;
         }
-        if (next != null) {
-            readyDirectSubs.rotateTo(next);
+    }
+
+    public boolean removeSubscription(Subscription<V> subscription) {
+        synchronized (mutex) {
+            SubscriptionContext old = consumers.remove(subscription);
+            if (old != null) {
+                old.close();
+                return true;
+            }
+            return false;
         }
-        return accepted ? null : matches;
     }
 
-    public boolean pollingDispatch() {
+    /**
+     * This class holds state associated with a subscription in this queue.
+     */
+    class SubscriptionContext extends LinkedNode<SubscriptionContext> implements ISourceController<V> {
 
-        // System.out.println("polling dispatch");
+        final Subscription<V> sub;
+        boolean isStarted;
 
-        // Keep looping until we can find one subscription that we can
-        // dispatch a message to.
-        while (true) {
-
-            // Find a subscription that has a message available for dispatch.
-            SubscriptionNode subNode = null;
-            StoreNode<K, V> storeNode = null;
-            synchronized (mutex) {
+        // The consumer's cursor:
+        final Cursor cursor;
 
-                if (readyPollingSubs.isEmpty()) {
-                    return false;
+        SubscriptionContext(Subscription<V> target) {
+            this.cursor = new Cursor(target.toString());
+            this.sub = target;
+            if (queue.isEmpty()) {
+                cursor.reset(liveCursor.sequence);
+            } else {
+                cursor.reset(queue.getHead().sequence);
+            }
+        }
+
+        public void start() {
+            if (!isStarted) {
+                isStarted = true;
+                // If we're behind the live cursor add to the trailing consumer
+                // list:
+                if (updateCursor()) {
+                    trailingConsumers.addLast(this);
+                    notifyReady();
                 }
+            }
+        }
 
-                SubscriptionNode next = null;
-                subNode = readyPollingSubs.getHead();
-                while (subNode != null) {
-                    next = subNode.getNext();
-
-                    storeNode = subNode.cursorPeek();
-                    if (storeNode != null) {
-                        // Found a message..
-                        break;
-                    } else {
-                        // Cursor dried up... this subscriber can now be direct
-                        // dispatched.
-                        // System.out.println("Subscription state change: ready polling -> ready direct: "+subNode);
-                        subNode.unlink();
-                        readyDirectSubs.addLast(subNode);
-                    }
-                    subNode = next;
+        public void stop() {
+            // If started remove this from any dispatch list
+            if (isStarted) {
+                cursor.deactivate();
+                unlink();
+                isStarted = false;
+            }
+        }
+
+        public void close() {
+            stop();
+        }
+
+        /**
+         * When the consumer is trailing the dispatch calls this method until
+         * the consumer is caught up.
+         */
+        public final void trailingDispatch() {
+
+            // Update cursor to see if we're still behind
+            if (updateCursor()) {
+                QueueElement next = cursor.getNext();
+
+                // If the next element is paged out,
+                // Add to the list of restoring consumers
+                if (next.pagedOutCount > 0) {
+                    unlink();
+                    restoringConsumers.addLast(this);
+                } else {
+                    offer(next, null);
                 }
+            }
+        }
+
+        /**
+         * Advances the liveCursor to the next available element. And checks
+         * whether or not this consumer is caught up to the live cursor
+         * 
+         * @return true if the cursor is behind the live cursor.
+         */
+        public final boolean updateCursor() {
+            // Advance to the next available element:
+            cursor.getNext();
+
+            // Are we now live?
+            if (cursor.compareTo(liveCursor) >= 0 || queue.isEmpty()) {
+                cursor.deactivate();
+                unlink();
+                liveConsumers.addLast(this);
+                return false;
+            }
+            return true;
+        }
+
+        public final int offer(QueueElement qe, Cursor live) {
+
+            // If we are already passed this element return NO_MATCH:
+            if (cursor.sequence > qe.sequence) {
+                return NO_MATCH;
+            }
+
+            // If this element isn't matched, NO_MATCH:
+            if (!sub.matches(qe.elem)) {
+                cursor.skip(qe);
+                return NO_MATCH;
+            }
 
-                if (storeNode == null) {
-                    return false;
+            // If the sub doesn't remove on dispatch set an ack listener:
+            Subscription.SubscriptionDeliveryCallback callback = sub.isRemoveOnDispatch() ? null : qe;
+
+            // See if the sink has room:
+            if (sub.offer(qe.elem, this, callback)) {
+                if (!sub.isBrowser()) {
+                    qe.setAcquired(this);
+                    loader.releaseMemory(qe);
+
+                    // If this came from the live cursor, update it
+                    // if we acquired the element:
+                    if (live != null) {
+                        live.skip(qe);
+                    }
+
+                    // If remove on dispatch acknowledge now:
+                    if (callback == null) {
+                        qe.acknowledge();
+                    }
                 }
 
-                if (next != null) {
-                    readyPollingSubs.rotateTo(next);
+                // Advance our cursor:
+                cursor.skip(qe);
+
+                return ACCEPTED;
+            } else {
+                // Remove from dispatch list until we are resumed:
+                if (DEBUG) {
+                    System.out.println(this + " Declined: " + qe);
                 }
+                return DECLINED;
             }
+        }
+
+        // ///////////////////////////////////////////////////////////////////////////////
+        // Source sizeController implementation
+        // ///////////////////////////////////////////////////////////////////////////////
+        public void elementDispatched(V elem) {
+            // No-op we only offer to the consumer
+        }
 
-            // The subscription's sink may be full..
-            IFlowSink<V> sink = subNode.subscription.getSink();
-            boolean accepted = sink.offer(storeNode.getValue(), sourceControler);
+        public Flow getFlow() {
+            return flow;
+        }
 
+        public IFlowResource getFlowResource() {
+            return SharedQueue.this;
+        }
+
+        public void onFlowBlock(ISinkController<?> sinkController) {
+            if (DEBUG)
+                System.out.println(this + " blocked.");
             synchronized (mutex) {
-                if (accepted) {
-                    subNode.cursorNext();
-                    if (subNode.subscription.isPreAcquired() && subNode.subscription.isRemoveOnDispatch()) {
-                        StoreNode<K, V> removed = store.remove(storeNode.getKey());
-                        assert removed != null : "Since the node was aquired.. it should not have been removed by anyone else.";
-                        sinkController.elementDispatched(storeNode.getValue());
+                unlink();
+            }
+        }
+
+        public void onFlowResume(ISinkController<?> sinkController) {
+            if (DEBUG)
+                System.out.println(this + " resumed.");
+            synchronized (mutex) {
+                trailingConsumers.addLast(this);
+                notifyReady();
+            }
+        }
+
+        public String toString() {
+            return sub + ", " + cursor;
+        }
+    }
+
+    class Cursor implements Comparable<Cursor> {
+
+        private final String name;
+        QueueElement current = null;
+        long sequence = -1;
+        boolean paging = false;
+        long restoreBlock = -1;
+        long requestedBlock = -1;
+
+        public Cursor(String name) {
+            this.name = name;
+        }
+
+        public final void reset(long sequence) {
+            updateSequence(sequence);
+            current = null;
+        }
+
+        public void deactivate() {
+            if (paging) {
+                loader.removeBlockInterest(this);
+                requestedBlock = -1;
+            }
+            current = null;
+        }
+
+        private final void updateSequence(final long newSequence) {
+            this.sequence = newSequence;
+            // long newBlock = sequence / RESTORE_BLOCK_SIZE;
+            // if (newBlock != restoreBlock) {
+            // restoreBlock = newBlock;
+            // }
+
+            if (DEBUG && sequence > nextSequenceNumber) {
+                new Exception(this + "cursor overflow").printStackTrace();
+            }
+        }
+
+        private final void checkPageIn() {
+            if (current != null && current.isPagedOut()) {
+                if (current.restoreBlock != requestedBlock) {
+                    if (paging) {
+                        loader.removeBlockInterest(this);
                     }
-                    return true;
-                } else {
-                    // System.out.println("Subscription state change: ready polling -> un-ready polling: "+subNode);
-                    // Subscription is no longer ready..
-                    subNode.cursorUnPeek(storeNode);
-                    subNode.unlink();
-                    unreadyPollingSubs.addLast(subNode);
+                    requestedBlock = current.restoreBlock;
+                    paging = true;
+                    loader.addBlockInterest(this, current);
                 }
+            } else if (paging && requestedBlock != sequence / RESTORE_BLOCK_SIZE) {
+                loader.removeBlockInterest(this);
+                requestedBlock = -1;
+                paging = false;
             }
         }
-    }
 
-    public final V poll() {
-        throw new UnsupportedOperationException("Not supported");
-    }
+        /**
+         * @return true if their is a paged in, unacquired element that is ready
+         *         for dispatch
+         */
+        public final boolean isReady() {
+            getNext();
+            // Possible when the queue is empty
+            if (current == null || current.isAcquired() || current.isPagedOut()) {
+                return false;
+            }
+            return true;
+        }
 
-    public void addSubscription(Subscription<V> subscription) {
-        synchronized (mutex) {
-            SubscriptionNode node = subscriptions.get(subscription);
-            if (node == null) {
-                node = new SubscriptionNode(subscription);
-                subscriptions.put(subscription, node);
-                sinks.put(subscription.getSink(), node);
-                if (!store.isEmpty()) {
-                    readyPollingSubs.addLast(node);
-                    notifyReady();
-                } else {
-                    readyDirectSubs.addLast(node);
+        /**
+         * Sets the cursor to the next sequence number after the provided
+         * element:
+         */
+        public final void skip(QueueElement elem) {
+            QueueElement next = elem.isLinked() ? elem.getNext() : null;
+            if (next != null) {
+                updateSequence(next.sequence);
+                current = next;
+            } else {
+                current = null;
+                updateSequence(sequence + 1);
+            }
+        }
+
+        public final QueueElement getNext() {
+            if (queue.isEmpty() || queue.getTail().sequence < sequence) {
+                current = null;
+                return null;
+            }
+
+            if (queue.getTail().sequence == sequence) {
+                current = queue.getTail();
+            }
+
+            // Get a pointer to the next element (make sure
+            // that our next pointer is linked, it could have
+            // been paged out):
+            if (current == null || !current.isLinked()) {
+                current = queue.upper(sequence, true);
+                if (current == null) {
+                    return null;
+                }
+            }
+
+            // Skip acquired elements:
+            while (current.isAcquired()) {
+                QueueElement last = current;
+                current = current.getNext();
+
+                // If the next element is null, increment our sequence
+                // and return:
+                if (current == null) {
+                    updateSequence(last.getSequence() + 1);
+                    return null;
+                }
+
+                // If we're paged out break, this isn't the
+                // next, but it means that we need to page
+                // in:
+                if (current.isPagedOut()) {
+                    break;
                 }
             }
+
+            if (current.sequence < sequence) {
+                return null;
+            } else {
+                updateSequence(current.sequence);
+            }
+            checkPageIn();
+            return current;
         }
-    }
 
-    public boolean removeSubscription(Subscription<V> subscription) {
-        synchronized (mutex) {
-            SubscriptionNode node = subscriptions.remove(subscription);
-            if (node != null) {
-                sinks.remove(subscription.getSink());
-                node.unlink();
-                return true;
+        public int compareTo(Cursor o) {
+            if (o.sequence > sequence) {
+                return -1;
+            } else if (sequence > o.sequence) {
+                return 1;
+            } else {
+                return 0;
             }
-            return false;
+        }
+
+        public String toString() {
+            return "Cursor: " + sequence + " [" + name + "]";
         }
     }
 
-    private class SubscriptionNode extends LinkedNode<SubscriptionNode> {
-        public final Subscription<V> subscription;
-        public StoreCursor<K, V> cursor;
+    class QueueElement extends SortedLinkedListNode<QueueElement> implements Subscription.SubscriptionDeliveryCallback {
 
-        public SubscriptionNode(Subscription<V> subscription) {
-            this.subscription = subscription;
-            this.cursor = store.openCursor();
+        V elem;
+        SubscriptionContext owner;
+        final long sequence;
+        long restoreBlock;
+
+        // When a queue element is paged out, the first element
+        // in a range of paged out elements keeps track of the count
+        // and size of paged out elements.
+        int pagedOutCount = 0;
+        long pagedOutSize = 0;
+        int size = 0;
+
+        public QueueElement(V elem, long sequence) {
+            this.elem = elem;
+            if (elem != null) {
+                size = sizeLimiter.getElementSize(elem);
+            }
+            this.sequence = sequence;
+            this.restoreBlock = sequence / RESTORE_BLOCK_SIZE;
         }
 
-        public void resumeAt(StoreNode<K, V> node) {
-            this.cursor = store.openCursorAt(node);
+        public void setAcquired(SubscriptionContext owner) {
+            this.owner = owner;
+            sizeController.elementDispatched(elem);
         }
 
-        public void cursorNext() {
-            cursor.next();
+        public final void acknowledge() {
+            synchronized (mutex) {
+                unlink();
+                totalQueueCount--;
+                if (isPagedOut()) {
+                    return;
+                } else if (store.isElemPersistent(elem) || store.isFromStore(elem)) {
+                    store.deleteQueueElement(queueDescriptor, elem);
+                }
+            }
         }
 
-        public StoreNode<K, V> cursorPeek() {
-            if (cursor == null) {
-                return null;
+        public final void unacquire(ISourceController<?> source) {
+            // TODO reenqueue and update cursors back to this position.
+            // If there are subscriptions with selectors this could get
+            // tricky to avoid reevaluating already evaluated selectors.
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * Pages this element out to free memory.
+         * 
+         * Memory is freed if upon the return of this call the passed in
+         * sizeController was not blocked. If the sizeController was blocked
+         * then memory is not freed until it is next unblocked.
+         */
+        private void pageOut(ISourceController<?> controller) {
+            // See if we can page this out to save memory:
+            //
+            // Disqualifiers:
+            // - If this is already paged out then nothing to do
+            // - We don't page out acquired elements, the memory
+            // is accounted for by the owner, and we keep them
+            // in memory here, to make sure we don't try to pull
+            // it back in for another consumer
+            // - If there is a cursor active in this element's
+            // restore block don't page out, memory is accounted
+            // for in the cursor's sizeLimiter
+            if (pagedOutCount > 0 || owner != null || loader.inLoadQueue(this)) {
+                return;
             }
-            while (cursor.hasNext()) {
-                StoreNode<K, V> elemNode = cursor.peekNext();
 
-                // Skip over messages that are not a match.
-                if (!subscription.matches(elemNode.getValue())) {
-                    cursor.next();
-                    continue;
+            // If the element is not persistent then we'll need to request a
+            // save:
+            if (!store.isFromStore(elem) && !store.isElemPersistent(elem)) {
+                try {
+                    store.persistQueueElement(queueDescriptor, controller, elem, sequence, false);
+                } catch (Exception e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
                 }
+            }
+
+            pagedOutCount = 1;
+            pagedOutSize = size;
+            elem = null;
+
+            // Collapse adjacent paged out elements:
+            QueueElement next = getNext();
+            QueueElement prev = getPrevious();
+            // If the next element is paged out
+            // replace it with this
+            if (next != null && next.pagedOutCount > 0) {
+                pagedOutCount += next.pagedOutCount;
+                pagedOutSize += next.pagedOutSize;
+                next.unlink();
+            }
+            // If the previous elem is paged out unlink this
+            // entry:
+            if (prev != null && prev.pagedOutCount > 0) {
+                prev.pagedOutCount += pagedOutCount;
+                prev.pagedOutSize += pagedOutSize;
+                unlink();
+            }
+
+            if (DEBUG)
+                System.out.println("Paged out element: " + this);
+        }
 
-                if (subscription.isPreAcquired()) {
-                    if (elemNode.acquire(subscription)) {
-                        return elemNode;
-                    } else {
-                        cursor.next();
-                        continue;
+        /**
+         * Called to relink a paged in element after this element.
+         * 
+         * @param qe
+         *            The paged in element to relink.
+         */
+        public QueueElement pagedIn(QueueElement qe, long nextSequence) {
+            QueueElement ret = qe;
+            // See if we have a pointer to a paged out element:
+            if (sequence == qe.sequence) {
+                // Already paged in? Shouldn't be.
+                if (!isPagedOut()) {
+                    throw new IllegalStateException("Can't page in an already paged in element");
+                } else {
+                    // Otherwise set this element to the paged in one
+                    // and add a new QueueElement to hold any additional
+                    // paged out elements:
+                    elem = qe.elem;
+                    size = qe.size;
+                    pagedOutCount--;
+                    if (pagedOutCount > 0) {
+                        if (nextSequence == -1) {
+                            throw new IllegalStateException("Shouldn't have paged out elements at the end of the queue");
+                        }
+                        qe = new QueueElement(null, nextSequence);
+                        qe.pagedOutCount = pagedOutCount;
+                        qe.pagedOutSize = pagedOutSize - size;
+                        pagedOutCount = 0;
+                        pagedOutSize = 0;
+                        try {
+                            this.linkAfter(qe);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                    ret = this;
+                }
+            } else {
+                // Otherwise simply link this element into the list:
+                 queue.add(qe);
+                // Decrement pagedOutCount counter of previous element:
+                if (qe.prev != null && qe.prev.pagedOutCount > 0) {
+                    if(qe.prev.pagedOutCount > 1)
+                    {
+                        throw new IllegalStateException("Skipped paged in element");    
                     }
+                    pagedOutCount = qe.pagedOutCount - 1;
+                    qe.prev.pagedOutCount = 0;
+                    qe.prev.pagedOutSize = 0;
                 }
             }
-            cursor = null;
-            return null;
+
+            if (DEBUG)
+                System.out.println("Paged in element: " + this);
+
+            return ret;
         }
 
-        public void cursorUnPeek(StoreNode<K, V> node) {
-            if (subscription.isPreAcquired()) {
-                node.unacquire();
-            }
+        public boolean isPagedOut() {
+            return elem == null;
+        }
+
+        public boolean isAcquired() {
+            return owner != null;
         }
 
-        @Override
         public String toString() {
-            return "subscription from " + getResourceName() + " to " + subscription;
+            return "QueueElement " + sequence + " pagedOutCount: " + pagedOutCount + " owner: " + owner;
+        }
+
+        @Override
+        public long getSequence() {
+            return sequence;
         }
     }
 
-    public Mapper<K, V> getKeyMapper() {
-        return keyMapper;
+    /**
+     * Handles paging in of elements from the store.
+     * 
+     * If the queue's memory limit is greater than it's size this class -Does
+     * the initial load of messages recovered from the queue. -Handles updating
+     * redelivered status of elements.
+     * 
+     * If the queue's memory limit is less than the queue size then this class
+     * tracks cursor activity in the queue, loading elements into memory as they
+     * are needed.
+     * 
+     * @author cmacnaug
+     */
+    private class ElementLoader implements RestoreListener<V> {
+
+        private LinkedList<QueueStore.RestoredElement<V>> fromDatabase = new LinkedList<QueueStore.RestoredElement<V>>();
+        private final HashMap<Long, HashSet<Cursor>> requestedBlocks = new HashMap<Long, HashSet<Cursor>>();
+
+        public boolean inLoadQueue(QueueElement queueElement) {
+            return requestedBlocks.containsKey(queueElement.restoreBlock);
+        }
+
+        /**
+         * Must be called after an element is added to the queue to enforce
+         * memory limits
+         * 
+         * @param elem
+         *            The added element:
+         * @param source
+         *            The source of the message
+         */
+        public final void elementAdded(QueueElement qe, ISourceController<V> source) {
+            if (useMemoryLimiter) {
+                if (!qe.isPagedOut()) {
+                    memoryController.add(qe, source);
+
+                    if (memoryLimiter.getThrottled()) {
+
+                        qe.pageOut(memoryController);
+                        // If we paged it out release memory:
+                        if (qe.isPagedOut()) {
+                            releaseMemory(qe);
+                        }
+                    }
+                }
+            }
+        }
+
+        //Updates memory when an element is loaded from the database:
+        private final void elementLoaded(QueueElement qe) {
+            if (useMemoryLimiter) {
+                memoryController.add(qe, null);
+            }
+        }
+
+        public final void releaseMemory(QueueElement qe) {
+            if (useMemoryLimiter) {
+                memoryController.elementDispatched(qe);
+            }
+        }
+
+        public void addBlockInterest(Cursor cursor, QueueElement element) {
+            HashSet<Cursor> cursors = requestedBlocks.get(cursor.requestedBlock);
+            if (cursors == null) {
+                cursors = new HashSet<Cursor>();
+                requestedBlocks.put(cursor.requestedBlock, cursors);
+
+                // Max sequence number is the end of this restoreBlock:
+                long maxSequence = (cursor.requestedBlock * RESTORE_BLOCK_SIZE) + RESTORE_BLOCK_SIZE;
+                // Don't pull in more than is paged out:
+                int maxCount = Math.min(element.pagedOutCount, RESTORE_BLOCK_SIZE);
+                if(DEBUG)
+                    System.out.println(cursor + " requesting restoreBlock:" + cursor.requestedBlock + " from " + element.getSequence() + " to " + maxSequence + " max: " + maxCount);
+                store.restoreQueueElements(queueDescriptor, element.getSequence(), maxSequence, maxCount, this);
+            }
+            cursors.add(cursor);
+        }
+
+        public void removeBlockInterest(Cursor cursor) {
+            long block = cursor.requestedBlock;
+            HashSet<Cursor> cursors = requestedBlocks.get(block);
+            if (cursors == null) {
+                if (DEBUG)
+                    System.out.println(this + " removeBlockInterest, no consumers " + cursor);
+            } else {
+                if (cursors.remove(cursor)) {
+                    if (cursors.isEmpty()) {
+                        requestedBlocks.remove(cursor.requestedBlock);
+                        //If this is the last cursor active in this block page out the block:
+                        if(useMemoryLimiter)
+                        {
+                            QueueElement qe = queue.upper(RESTORE_BLOCK_SIZE * cursor.requestedBlock, true);
+                            while(qe != null && qe.restoreBlock == block)
+                            {
+                                QueueElement next = qe.getNext();
+                                if(!qe.isPagedOut())
+                                {
+                                    qe.pageOut(memoryController);
+                                    // If we paged it out release memory:
+                                    if (qe.isPagedOut()) {
+                                        System.out.println(this + " removeBlockInterest, released memory for: " + this);
+                                        releaseMemory(qe);
+                                    }
+                                    qe = next;
+                                }
+                            }
+                        }
+                    }
+                } else {
+                    if (DEBUG)
+                        System.out.println(this + " removeBlockInterest, no cursor " + cursor);
+                }
+            }
+            
+        }
+
+        /**
+         * Returns loaded messages or null if none have been loaded.
+         * 
+         * @throws IOException
+         */
+        final void processPageInRequests() {
+            LinkedList<RestoredElement<V>> restoredElems = null;
+            synchronized (fromDatabase) {
+                if (fromDatabase.isEmpty()) {
+                    return;
+                }
+                restoredElems = fromDatabase;
+                fromDatabase = new LinkedList<RestoredElement<V>>();
+            }
+
+            // Process restored messages:
+            if (restoredElems != null) {
+                // boolean trailingRestore = false;
+                for (QueueStore.RestoredElement<V> restored : restoredElems) {
+                    try {
+                        V delivery = restored.getElement();
+                        QueueElement qe = new QueueElement(delivery, restored.getSequenceNumber());
+                        QueueElement lower = queue.lower(qe.sequence, true);
+                        qe = lower.pagedIn(qe, restored.getNextSequenceNumber());
+                        loader.elementLoaded(qe);
+
+                    } catch (Exception ioe) {
+                        ioe.printStackTrace();
+                        shutdown();
+                    }
+                }
+
+                // Add restoring consumers back to trailing consumers:
+                if (!restoringConsumers.isEmpty()) {
+                    trailingConsumers.addFirst(restoringConsumers);
+                }
+            }
+        }
+
+        public final boolean hasRestoredMessages() {
+            synchronized (fromDatabase) {
+                return !fromDatabase.isEmpty();
+            }
+        }
+
+        public void elementsRestored(Collection<RestoredElement<V>> msgs) {
+            synchronized (fromDatabase) {
+                fromDatabase.addAll(msgs);
+            }
+            synchronized (mutex) {
+                notifyReady();
+            }
+        }
+
+        public String toString() {
+            return "MsgRetriever " + SharedQueue.this;
+        }
     }
 
     public void setKeyMapper(Mapper<K, V> keyMapper) {
         this.keyMapper = keyMapper;
     }
 
-    public boolean removeByKey(K key) {
-        return false;
+    public void setStore(QueueStore<K, V> store) {
+        this.store = store;
     }
 
-    public boolean removeByValue(V value) {
-        return false;
+    public String toString() {
+        return "SharedQueue: " + getResourceName();
     }
 
     @Override
-    public String toString() {
-        return getResourceName();
+    protected ISinkController<V> getSinkController(V elem, ISourceController<?> source) {
+        return sizeController;
+    }
+
+    public V poll() {
+        throw new UnsupportedOperationException("poll not supported for shared queue");
     }
 
-    public FlowController<V> getFlowControler() {
-        return this.sinkController;
+    public IFlowController<V> getFlowControler() {
+        return sizeController;
     }
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java Mon Apr 27 18:40:44 2009
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.queue;
-
-import java.util.Iterator;
-
-public interface Store<K, V> {
-
-    public interface StoreNode<K, V> {
-
-        public boolean acquire(Subscription<V> ownerId);
-
-        public void unacquire();
-
-        public V getValue();
-
-        public K getKey();
-
-    }
-
-    public interface StoreCursor<K, V> extends Iterator<StoreNode<K, V>> {
-        public StoreNode<K, V> peekNext();
-
-        public void setNext(StoreNode<K, V> node);
-
-    }
-
-    StoreNode<K, V> remove(K key);
-
-    StoreNode<K, V> add(K key, V value);
-
-    StoreCursor<K, V> openCursor();
-
-    StoreCursor<K, V> openCursorAt(StoreNode<K, V> next);
-
-    boolean isEmpty();
-
-    int size();
-
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java Mon Apr 27 18:40:44 2009
@@ -17,15 +17,89 @@
 package org.apache.activemq.queue;
 
 import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.ISourceController;
 
 public interface Subscription<E> {
 
+    public interface SubscriptionDeliveryCallback {
+        
+        /**
+         * If {@link Subscription#isBrowser()} returns false this method
+         * indicates that the Subscription is finished with the element
+         * and that it can be removed from the queue. 
+         */
+        public void acknowledge();
+
+        /**
+         * Indicates that the subscription no longer has interest in
+         * the element and that it should be placed back on the queue. 
+         * 
+         * The provided source controller will be blocked if there 
+         * is not enough space available on the sub queue to
+         * reenqueue the element.
+         * 
+         * It is illegal to call this method after a prior call to 
+         * {@link #acknowledge()}. 
+         * 
+         * @param source The source controller.
+         */
+        public void unacquire(ISourceController<?> sourceController);
+    }
+
+    /**
+     * True if the message should be removed from the queue when it is
+     * dispatched to this subscription.
+     * 
+     * @return true if the element should be removed on dispatch
+     */
     public boolean isRemoveOnDispatch();
+    
+    /**
+     * @return True if this is a subscription browser. 
+     */
+    public boolean isBrowser();
 
     public boolean isPreAcquired();
 
-    public boolean matches(E message);
+    /**
+     * Returns true if the Subscription has a selector. If true
+     * is returned the {@link #matches(Object)} will be called
+     * prior to an attempt to offer the message to {@link Subscription}
+     * 
+     * @return true if this {@link Subscription} has a selector.
+     */
+    public boolean hasSelector();
+
+    /**
+     * Called is {@link #hasSelector()} returns true.
+     * 
+     * @param elem
+     *            The element to match.
+     * @return false if the message doesn't match
+     */
+    public boolean matches(E elem);
+
+    /**
+     * Offers an item to the subscription. If the subscription is not remove on
+     * dispatch, then it must call acknowledge method on the callback when it
+     * has acknowledged the message.
+     * 
+     * @param element
+     *            The delivery container the offered element.
+     * @param controller
+     *            The queue's controller, which must be used if the offered
+     *            element exceeds the subscription's buffer limits.
+     * @param callback
+     *            The {@link SubscriptionDeliveryCallback} associated with the element
+     * 
+     * @return true if the element was accepted false otherwise, if false is
+     *         returned the caller must have called
+     *         {@link ISourceController#onFlowBlock(ISinkController)} prior to
+     *         returning false.
+     */
+    public boolean offer(E element, ISourceController<E> controller, SubscriptionDeliveryCallback callback);
 
+    @Deprecated
     public IFlowSink<E> getSink();
 
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java Mon Apr 27 18:40:44 2009
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.queue;
-
-import java.util.HashMap;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class TreeMemoryStore<K, V> implements Store<K, V> {
-
-    AtomicLong counter = new AtomicLong();
-
-    class MemoryStoreNode implements StoreNode<K, V> {
-        private Subscription<V> owner;
-        private final K key;
-        private final V value;
-        private long id = counter.getAndIncrement();
-
-        public MemoryStoreNode(K key, V value) {
-            this.key = key;
-            this.value = value;
-        }
-
-        public boolean acquire(Subscription<V> owner) {
-            if (this.owner == null) {
-                this.owner = owner;
-                return true;
-            }
-            return false;
-        }
-
-        public K getKey() {
-            return key;
-        }
-
-        public V getValue() {
-            return value;
-        }
-
-        @Override
-        public String toString() {
-            return "node:" + id + ", owner=" + owner;
-        }
-
-        public void unacquire() {
-            this.owner = null;
-        }
-
-    }
-
-    class MemoryStoreCursor implements StoreCursor<K, V> {
-        private long last = -1;
-        private MemoryStoreNode next;
-
-        public MemoryStoreCursor() {
-        }
-
-        public MemoryStoreCursor(MemoryStoreNode next) {
-            this.next = next;
-        }
-
-        public void setNext(StoreNode<K, V> next) {
-            this.next = (MemoryStoreNode) next;
-        }
-
-        public boolean hasNext() {
-            if (next != null)
-                return true;
-
-            SortedMap<Long, MemoryStoreNode> m = order.tailMap(last + 1);
-            if (m.isEmpty()) {
-                next = null;
-            } else {
-                next = m.get(m.firstKey());
-            }
-            return next != null;
-        }
-
-        public StoreNode<K, V> peekNext() {
-            hasNext();
-            return next;
-        }
-
-        public StoreNode<K, V> next() {
-            try {
-                hasNext();
-                return next;
-            } finally {
-                last = next.id;
-                next = null;
-            }
-        }
-
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-    }
-
-    protected HashMap<K, MemoryStoreNode> map = new HashMap<K, MemoryStoreNode>();
-    protected TreeMap<Long, MemoryStoreNode> order = new TreeMap<Long, MemoryStoreNode>();
-
-    public StoreNode<K, V> add(K key, V value) {
-        MemoryStoreNode rc = new MemoryStoreNode(key, value);
-        MemoryStoreNode oldNode = map.put(key, rc);
-        if (oldNode != null) {
-            map.put(key, oldNode);
-            throw new IllegalArgumentException("Duplicate key violation");
-        }
-        order.put(rc.id, rc);
-        return rc;
-    }
-
-    public StoreNode<K, V> remove(K key) {
-        MemoryStoreNode node = (MemoryStoreNode) map.remove(key);
-        if (node != null) {
-            order.remove(node.id);
-        }
-        return node;
-    }
-
-    public boolean isEmpty() {
-        return map.isEmpty();
-    }
-
-    public org.apache.activemq.queue.Store.StoreCursor<K, V> openCursor() {
-        MemoryStoreCursor cursor = new MemoryStoreCursor();
-        return cursor;
-    }
-
-    public org.apache.activemq.queue.Store.StoreCursor<K, V> openCursorAt(org.apache.activemq.queue.Store.StoreNode<K, V> next) {
-        MemoryStoreCursor cursor = new MemoryStoreCursor((MemoryStoreNode) next);
-        return cursor;
-    }
-
-    public int size() {
-        return map.size();
-    }
-
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java Mon Apr 27 18:40:44 2009
@@ -1,4 +1,19 @@
-package org.apache.activemq.util;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */package org.apache.activemq.util;
 
 import java.util.HashMap;
 import java.util.Iterator;
@@ -6,24 +21,6 @@
 import org.apache.kahadb.util.LinkedNodeList;
 import org.apache.kahadb.util.LinkedNode;
 
-/**
- * <p>
- * Title: Sonic MQ v6.1
- * </p>
- * <p>
- * Description: Sonic MQ v6.1
- * </p>
- * <p>
- * Copyright: Copyright (c) 2004
- * </p>
- * <p>
- * Company: Sonic Software Corporation
- * </p>
- * 
- * @author Colin MacNaughton
- * @version 6.1
- */
-
 public class HashList<E> {
     private HashMap<E, HashListNode> m_index = null;
     private LinkedNodeList<HashListNode> m_list = null;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java Mon Apr 27 18:40:44 2009
@@ -24,10 +24,10 @@
 public interface StatefulWireFormat extends WireFormat{
 
     /**
-     * Writes a command to the target buffer, returning false if
-     * the command couldn't entirely fit into the target. 
+     * Writes a command to the sub buffer, returning false if
+     * the command couldn't entirely fit into the sub. 
      * @param command
-     * @param target
+     * @param sub
      * @return
      */
     public boolean marshal(Object command, ByteBuffer target) throws IOException;

Modified: activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto (original)
+++ activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto Mon Apr 27 18:40:44 2009
@@ -57,6 +57,7 @@
   optional bytes encoding = 3 [java_override_type = "AsciiBuffer"];
   optional bytes buffer = 4;
   optional int64 streamKey=5;
+  optional int32 messageSize=6;
 }  
 
 ///////////////////////////////////////////////////////////////
@@ -65,6 +66,10 @@
 
 message QueueAdd {
   optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
+  optional int32 applicationType = 2; 
+  optional bytes parentName = 3 [java_override_type = "AsciiBuffer"];
+  optional int32 queueType = 4;
+  optional int32 partitionId = 5;
 }  
 message QueueRemove {
   optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
@@ -74,6 +79,7 @@
   optional int64 queueKey=2;
   optional int64 messageKey=3;
   optional bytes attachment = 4;
+  optional int32 messageSize=5;
 }  
 message QueueRemoveMessage {
   optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Mon Apr 27 18:40:44 2009
@@ -29,8 +29,6 @@
 
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageBroker;
-import org.apache.activemq.broker.MessageDelivery;
-import org.apache.activemq.broker.Queue;
 import org.apache.activemq.broker.Router;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
@@ -39,7 +37,6 @@
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.Mapper;
 
 public abstract class BrokerTestBase extends TestCase {
 
@@ -53,6 +50,7 @@
     protected static final boolean USE_INPUT_QUEUES = true;
 
     protected final boolean USE_KAHA_DB = true;
+    protected final boolean PURGE_STORE = true;
     protected final boolean PERSISTENT = true;
     protected final boolean DURABLE = true;
 
@@ -93,19 +91,6 @@
     final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
     final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
 
-    static public final Mapper<AsciiBuffer, MessageDelivery> KEY_MAPPER = new Mapper<AsciiBuffer, MessageDelivery>() {
-        public AsciiBuffer map(MessageDelivery element) {
-            return element.getMsgId();
-        }
-    };
-    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
-        public Integer map(MessageDelivery element) {
-            // we modulo 10 to have at most 10 partitions which the producers
-            // gets split across.
-            return (int) (element.getProducerId().hashCode() % 10);
-        }
-    };
-
     @Override
     protected void setUp() throws Exception {
         dispatcher = createDispatcher();
@@ -139,7 +124,7 @@
         createConnections();
 
         // Start 'em up.
-        startServices();
+        startClients();
         try {
             reportRates();
         } finally {
@@ -155,7 +140,7 @@
         createConnections();
 
         // Start 'em up.
-        startServices();
+        startClients();
         try {
             reportRates();
         } finally {
@@ -171,7 +156,7 @@
         createConnections();
 
         // Start 'em up.
-        startServices();
+        startClients();
         try {
             reportRates();
         } finally {
@@ -187,7 +172,7 @@
         createConnections();
 
         // Start 'em up.
-        startServices();
+        startClients();
         try {
             reportRates();
         } finally {
@@ -203,7 +188,7 @@
         createConnections();
 
         // Start 'em up.
-        startServices();
+        startClients();
         try {
             reportRates();
         } finally {
@@ -219,7 +204,7 @@
         createConnections();
 
         // Start 'em up.
-        startServices();
+        startClients();
         try {
             reportRates();
         } finally {
@@ -235,7 +220,7 @@
         createConnections();
 
         // Start 'em up.
-        startServices();
+        startClients();
         try {
             reportRates();
         } finally {
@@ -259,7 +244,7 @@
         consumers.get(0).setThinkTime(50);
 
         // Start 'em up.
-        startServices();
+        startClients();
         try {
             reportRates();
         } finally {
@@ -282,7 +267,7 @@
         }
 
         // Start 'em up.
-        startServices();
+        startClients();
         try {
             reportRates();
         } finally {
@@ -310,7 +295,7 @@
         consumers.get(0).setThinkTime(1);
 
         // Start 'em up.
-        startServices();
+        startClients();
         try {
 
             System.out.println("Checking rates for test: " + getName());
@@ -349,7 +334,7 @@
         consumers.get(0).setThinkTime(1);
 
         // Start 'em up.
-        startServices();
+        startClients();
         try {
 
             System.out.println("Checking rates for test: " + getName());
@@ -391,6 +376,8 @@
             sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
             brokers.add(sendBroker);
         }
+        
+        startBrokers();
 
         Destination[] dests = new Destination[destCount];
 
@@ -400,11 +387,9 @@
             bean.setDomain(ptp ? Router.QUEUE_DOMAIN : Router.TOPIC_DOMAIN);
             dests[i] = bean;
             if (ptp) {
-                Queue queue = createQueue(sendBroker, dests[i]);
-                sendBroker.getDefaultVirtualHost().addQueue(queue);
+                sendBroker.getDefaultVirtualHost().createQueue(dests[i]);
                 if (multibroker) {
-                    queue = createQueue(rcvBroker, dests[i]);
-                    rcvBroker.getDefaultVirtualHost().addQueue(queue);
+                    rcvBroker.getDefaultVirtualHost().createQueue(dests[i]);
                 }
             }
         }
@@ -473,17 +458,6 @@
 
     abstract protected RemoteProducer cerateProducer();
 
-    private Queue createQueue(MessageBroker broker, Destination destination) {
-        Queue queue = new Queue();
-        queue.setBroker(broker);
-        queue.setDestination(destination);
-        queue.setKeyExtractor(KEY_MAPPER);
-        if (usePartitionedQueue) {
-            queue.setPartitionMapper(PARTITION_MAPPER);
-        }
-        return queue;
-    }
-
     private MessageBroker createBroker(String name, String bindURI, String connectUri) throws Exception {
         MessageBroker broker = new MessageBroker();
         broker.setName(name);
@@ -502,8 +476,8 @@
             store = StoreFactory.createStore("memory");
         }
 
-        store.setStoreDirectory(new File("target/test-data/broker-test/" +  broker.getName()));
-        store.setDeleteAllMessages(true);
+        store.setStoreDirectory(new File("sub/test-data/broker-test/" +  broker.getName()));
+        store.setDeleteAllMessages(PURGE_STORE);
         return store;
     }
 
@@ -523,10 +497,15 @@
         }
     }
 
-    private void startServices() throws Exception {
+    private void startBrokers() throws Exception
+    {
         for (MessageBroker broker : brokers) {
             broker.start();
         }
+    }
+    
+    private void startClients() throws Exception {
+        
         for (RemoteConsumer connection : consumers) {
             connection.start();
         }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java Mon Apr 27 18:40:44 2009
@@ -84,7 +84,7 @@
         try {
             if (command.getClass() == WireFormatInfo.class) {
             } else if (command.getClass() == BrokerInfo.class) {
-                System.out.println("Consumer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
+                //System.out.println("Consumer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
             } else if (command.getClass() == MessageDispatch.class) {
                 MessageDispatch msg = (MessageDispatch) command;
                 lastMessage = msg.getMessage();

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java Mon Apr 27 18:40:44 2009
@@ -73,7 +73,7 @@
         try {
             if (command.getClass() == WireFormatInfo.class) {
             } else if (command.getClass() == BrokerInfo.class) {
-                System.out.println("Producer " + name + " connected to " + ((BrokerInfo) command).getBrokerName());
+                //System.out.println("Producer " + name + " connected to " + ((BrokerInfo) command).getBrokerName());
             } else if (command.getClass() == ProducerAck.class) {
                 ProducerAck fc = (ProducerAck) command;
                 synchronized (outboundQueue) {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Mon Apr 27 18:40:44 2009
@@ -31,6 +31,7 @@
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
 
 public abstract class StorePerformanceBase extends TestCase {
 
@@ -39,7 +40,7 @@
     
     
     private Store store;
-    private AsciiBuffer queueName;
+    private QueueStore.QueueDescriptor queueId;
 
     protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
     protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
@@ -54,11 +55,12 @@
         store = createStore();
         store.start();
         
-        queueName = new AsciiBuffer("test");
+        queueId = new QueueStore.QueueDescriptor();
+        queueId.setQueueName(new AsciiBuffer("test"));
         store.execute(new VoidCallback<Exception>() {
             @Override
             public void run(Session session) throws Exception {
-                session.queueAdd(queueName);
+                session.queueAdd(queueId);
             }
         }, null);
     }
@@ -111,6 +113,7 @@
                     messageRecord.setMessageId(new AsciiBuffer(""+i));
                     messageRecord.setEncoding(new AsciiBuffer("encoding"));
                     messageRecord.setBuffer(buffer);
+                    messageRecord.setSize(buffer.getLength());
 
                     Runnable onFlush = new Runnable(){
                         public void run() {
@@ -120,13 +123,16 @@
                             }
                         }
                     };
+                    final long queueKey = i + 1;
                     store.execute(new VoidCallback<Exception>() {
                         @Override
                         public void run(Session session) throws Exception {
                             session.messageAdd(messageRecord);
                             QueueRecord queueRecord = new Store.QueueRecord();
                             queueRecord.setMessageKey(messageRecord.getKey());
-                            session.queueAddMessage(queueName, queueRecord);
+                            queueRecord.setQueueKey(queueKey);
+                            queueRecord.setSize(messageRecord.getSize());
+                            session.queueAddMessage(queueId, queueRecord);
                         }
                     }, onFlush);
                     
@@ -185,11 +191,11 @@
                     store.execute(new VoidCallback<Exception>() {
                         @Override
                         public void run(Session session) throws Exception {
-                            Iterator<QueueRecord> queueRecords = session.queueListMessagesQueue(queueName, null, 1000);
+                            Iterator<QueueRecord> queueRecords = session.queueListMessagesQueue(queueId, 0L, -1L, 1000);
                             for (Iterator<QueueRecord> iterator = queueRecords; iterator.hasNext();) {
                                 QueueRecord r = iterator.next();
                                 records.add(session.messageGetRecord(r.getMessageKey()));
-                                session.queueRemoveMessage(queueName, r.messageKey);
+                                session.queueRemoveMessage(queueId, r.messageKey);
                             }
                         }
                     }, onFlush);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Mon Apr 27 18:40:44 2009
@@ -21,18 +21,21 @@
 
 import junit.framework.TestCase;
 
-import org.apache.activemq.broker.store.Store.Callback;
+import org.apache.activemq.broker.store.Store.FatalStoreException;
 import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.broker.store.Store.QueueQueryResult;
+import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.Store.Session;
 import org.apache.activemq.broker.store.Store.VoidCallback;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
 
 public abstract class StoreTestBase extends TestCase {
 
     private Store store;
 
-    abstract protected Store createStore();
+    abstract protected Store createStore(boolean delete);
 
     abstract protected boolean isStoreTransactional();
 
@@ -40,7 +43,7 @@
 
     @Override
     protected void setUp() throws Exception {
-        store = createStore();
+        store = createStore(true);
         store.start();
     }
 
@@ -57,6 +60,7 @@
         expected.setEncoding(new AsciiBuffer("encoding"));
         expected.setMessageId(new AsciiBuffer("1000"));
         expected.setKey(store.allocateStoreTracking());
+        expected.setSize(expected.getBuffer().getLength());
         
         store.execute(new VoidCallback<Exception>() {
             public void run(Session session) throws Exception {
@@ -74,7 +78,10 @@
     }
 
     public void testQueueAdd() throws Exception {
-        final AsciiBuffer expected = new AsciiBuffer("test");
+        final QueueStore.QueueDescriptor expected = new QueueStore.QueueDescriptor();
+        expected.setQueueName(new AsciiBuffer("testQueue"));
+        expected.setApplicationType((short)1);
+        
         store.execute(new VoidCallback<Exception>() {
             @Override
             public void run(Session session) throws Exception {
@@ -82,23 +89,102 @@
             }
         }, null);
 
+        //Test that the queue was created:
+        checkQueue(expected, 0, 0);
+        
+        if(isStorePersistent())
+        {
+            //Restart the store and make sure the queue is still there
+            store.stop();
+            store = createStore(false);
+            store.start();
+            
+            //Test that the queue was persisted
+            checkQueue(expected, 0, 0);
+        }
+    }
+    
+    public void testQueueMessageAdd() throws Exception {
+        final QueueStore.QueueDescriptor queue = new QueueStore.QueueDescriptor();
+        queue.setQueueName(new AsciiBuffer("testQueue"));
+        queue.setApplicationType((short)1);
+        
+        final MessageRecord message = new MessageRecord();
+        message.setBuffer(new Buffer("buffer"));
+        message.setEncoding(new AsciiBuffer("encoding"));
+        message.setMessageId(new AsciiBuffer("1000"));
+        message.setKey(store.allocateStoreTracking());
+        message.setSize(message.getBuffer().getLength());
+        
+        final QueueRecord qRecord = new QueueRecord();
+        qRecord.setMessageKey(message.getKey());
+        qRecord.setQueueKey(1L);
+        qRecord.setSize(message.getSize());
+        
         store.execute(new VoidCallback<Exception>() {
             @Override
             public void run(Session session) throws Exception {
-                Iterator<AsciiBuffer> list = session.queueList(null, 100);
-                assertTrue(list.hasNext());
-                AsciiBuffer actual = list.next();
-                assertEquals(expected, actual);
+                session.queueAdd(queue);
+                session.messageAdd(message);
+                session.queueAddMessage(queue, qRecord);
             }
         }, null);
+
+        checkQueue(queue, message.getSize(), 1);
+        checkMessageRestore(queue, qRecord, message);
+        
+        //Restart the store and make sure the queue is still there
+        if(isStorePersistent())
+        {
+            store.stop();
+            store = createStore(false);
+            store.start();
+            
+            //Test that the queue was persisted
+            checkQueue(queue, message.getSize(), 1);
+            checkMessageRestore(queue, qRecord, message);
+        }
     }
 
+    private void checkQueue(final QueueStore.QueueDescriptor queue, final long expectedSize, final long expectedCount) throws FatalStoreException, Exception
+    {
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                Iterator<QueueQueryResult> list = session.queueList(null, 100);
+                assertTrue(list.hasNext());
+                QueueQueryResult actual = list.next();
+                assertEquals(queue, actual.getDescriptor());
+                assertEquals(expectedSize, actual.getSize());
+                assertEquals(expectedCount, actual.getCount());
+            }
+        }, null);
+    }
+    
+    private void checkMessageRestore(final QueueStore.QueueDescriptor queue, final QueueRecord qRecord, final MessageRecord message ) throws FatalStoreException, Exception
+    {
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                Iterator<QueueRecord> qRecords = session.queueListMessagesQueue(queue, 0L, -1L, -1);
+                assertTrue(qRecords.hasNext());
+                QueueRecord qr = qRecords.next();
+                assertEquals(qRecord.getQueueKey(), qr.getQueueKey());
+                assertEquals(qRecord.getMessageKey(), message.getKey());
+                MessageRecord record = session.messageGetRecord(qr.getMessageKey());
+                assertEquals(record, message);
+            }
+        }, null);
+    }
+    
     public void testStoreExecuteExceptionPassthrough() throws Exception {
         try {
             store.execute(new VoidCallback<Exception>() {
                 @Override
                 public void run(Session session) throws Exception {
-                    session.queueAdd(new AsciiBuffer("test"));
+                    QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+                    qd.setQueueName(new AsciiBuffer("test"));
+                    session.queueAdd(qd);
                     throw new IOException("Expected");
                 }
             }, null);
@@ -113,12 +199,11 @@
             store.execute(new VoidCallback<Exception>() {
                 @Override
                 public void run(Session session) throws Exception {
-                    Iterator<AsciiBuffer> list = session.queueList(null, 100);
+                    Iterator<QueueQueryResult> list = session.queueList(null, 100);
                     assertFalse(list.hasNext());
                 }
             }, null);
         }
-
     }
 
     static void assertEquals(MessageRecord expected, MessageRecord actual) {
@@ -126,6 +211,17 @@
         assertEquals(expected.getEncoding(), actual.getEncoding());
         assertEquals(expected.getMessageId(), actual.getMessageId());
         assertEquals(expected.getStreamKey(), actual.getStreamKey());
+        assertEquals(expected.getSize(), actual.getSize());
+    }
+    
+    static void assertEquals(QueueStore.QueueDescriptor expected, QueueStore.QueueDescriptor actual) {
+        assertEquals(expected.getParent(), actual.getParent());
+        assertEquals(expected.getQueueType(), actual.getQueueType());
+        assertEquals(expected.getApplicationType(), actual.getApplicationType());
+        assertEquals(expected.getPartitionKey(), actual.getPartitionKey());
+        assertEquals(expected.getQueueName(), actual.getQueueName());
+        //TODO test partitions?
+        
     }
 
 }



Mime
View raw message