activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r773616 [2/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/stomp/ main/java/org/apache/activemq/broker/store/ main/java/o...
Date Mon, 11 May 2009 16:25:10 GMT
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Mon May 11 16:25:10 2009
@@ -16,11 +16,13 @@
  */
 package org.apache.activemq.queue;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.TreeMap;
+import java.util.Map.Entry;
 
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
@@ -34,6 +36,8 @@
 import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.activemq.queue.QueueStore.RestoreListener;
 import org.apache.activemq.queue.QueueStore.RestoredElement;
+import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
+import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.SortedLinkedList;
 import org.apache.activemq.util.SortedLinkedListNode;
@@ -69,38 +73,40 @@
     private Mapper<K, V> keyMapper;
 
     private final ElementLoader loader;
-    private final Cursor liveCursor;
+    private Cursor sharedCursor;
     private QueueStore<K, V> store;
+    private PersistencePolicy<V> persistencePolicy;
     private long nextSequenceNumber = 0;
 
     // Open consumers:
     private final HashMap<Subscription<V>, SubscriptionContext> consumers = new HashMap<Subscription<V>, SubscriptionContext>();
+    // Tracks count of active subscriptions with a selector:
+    private int activeSelectorSubs = 0;
 
-    // Consumers that are operating against the live cursor:
-    private final LinkedNodeList<SubscriptionContext> liveConsumers = new LinkedNodeList<SubscriptionContext>();
+    // Consumers that are operating against the shared cursor:
+    private final LinkedNodeList<SubscriptionContext> sharedConsumers = new LinkedNodeList<SubscriptionContext>();
 
-    // Browsing subscriptions:
-    private final LinkedNodeList<SubscriptionContext> liveBrowsers = new LinkedNodeList<SubscriptionContext>();
+    // Browsing subscriptions that are ready for dispatch:
+    private final LinkedNodeList<SubscriptionContext> readyBrowsers = new LinkedNodeList<SubscriptionContext>();
 
-    // Consumers that are behind the live cursor
+    // Consumers that are behind the shared cursor
     private final LinkedNodeList<SubscriptionContext> trailingConsumers = new LinkedNodeList<SubscriptionContext>();
 
-    // Consumers that are waiting for elements to be paged in:
-    private final LinkedNodeList<SubscriptionContext> restoringConsumers = new LinkedNodeList<SubscriptionContext>();
-
     // Limiter/Controller for the size of the queue:
     private final FlowController<V> sizeController;
     private final IFlowSizeLimiter<V> sizeLimiter;
 
-    // Memory Limiter and controller operate against the liveCursor.
-    private static final long DEFAULT_MEMORY_LIMIT = 1000;
-    private boolean useMemoryLimiter;
+    // Default cursor memory limit:
+    private static final long DEFAULT_MEMORY_LIMIT = 10;
 
     private int totalQueueCount;
 
     private boolean initialized = false;
     private boolean started = false;
 
+    private Mapper<Long, V> expirationMapper;
+    private final Expirator expirator = new Expirator();
+
     public SharedQueue(String name, IFlowSizeLimiter<V> limiter) {
         this(name, limiter, null);
     }
@@ -119,14 +125,6 @@
         sizeController.useOverFlowQueue(false);
         super.onFlowOpened(sizeController);
 
-        liveCursor = new Cursor(name);
-
-        if (DEFAULT_MEMORY_LIMIT < limiter.getCapacity()) {
-            useMemoryLimiter = true;
-        } else {
-            useMemoryLimiter = false;
-        }
-
         loader = new ElementLoader();
 
     }
@@ -148,6 +146,14 @@
             if (initialized) {
                 throw new IllegalStateException("Already initialized");
             } else {
+
+                // Default persistence policy when not set.
+                if (persistencePolicy == null) {
+                    persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY<V>();
+                }
+
+                sharedCursor = new Cursor(queueDescriptor.getQueueName().toString(), true, true);
+
                 // Initialize counts:
                 nextSequenceNumber = sequenceMax + 1;
                 if (count > 0) {
@@ -155,13 +161,21 @@
                     totalQueueCount = count;
                     // Add a paged out placeholder:
                     QueueElement qe = new QueueElement(null, sequenceMin);
-                    qe.pagedOutCount = count;
-                    qe.pagedOutSize = size;
+                    qe.loaded = false;
                     queue.add(qe);
                 }
 
                 initialized = true;
-                liveCursor.reset(sequenceMin);
+                sharedCursor.reset(sequenceMin);
+
+                // Create an expiration mapper if one is not set.
+                if (expirationMapper == null) {
+                    expirationMapper = new Mapper<Long, V>() {
+                        public Long map(V element) {
+                            return -1L;
+                        }
+                    };
+                }
 
                 if (DEBUG)
                     System.out.println(this + "Initialized, first seq: " + sequenceMin + " next sequence: " + nextSequenceNumber);
@@ -196,7 +210,8 @@
 
             if (!started) {
                 started = true;
-                liveCursor.activate();
+                sharedCursor.activate();
+                expirator.start();
                 if (isDispatchReady()) {
                     notifyReady();
                 }
@@ -230,17 +245,12 @@
 
             // Save the element (note that it is important this be done after
             // we've set the sequence number above)
-            if (!store.isFromStore(elem) && store.isElemPersistent(elem)) {
-                try {
-                    // TODO Revisit delayability criteria (basically,
-                    // opened, unblocked receivers, that aren't too far
-                    // from this element)
-                    store.persistQueueElement(queueDescriptor, source, elem, qe.sequence, true);
-
-                } catch (Exception e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                }
+            if (persistencePolicy.isPersistent(elem)) {
+                // For now base decision on whether to delay flush on
+                // whether or not there are
+                // consumers ready.
+                boolean delayable = !sharedConsumers.isEmpty();
+                qe.save(source, delayable);
             }
 
             // Add it to our queue:
@@ -248,6 +258,7 @@
             totalQueueCount++;
             // Check with the loader to see if it needs to be paged out:
             loader.elementAdded(qe, source);
+            expirator.elementAdded(qe);
 
             // Request dispatch for the newly enqueued element.
             // TODO consider optimizing to do direct dispatch?
@@ -255,7 +266,6 @@
             // this for cases where the caller is on the same dispatcher
             if (isDispatchReady()) {
                 notifyReady();
-                // pollingDispatch();
             }
         }
     }
@@ -263,7 +273,9 @@
     public boolean pollingDispatch() {
 
         synchronized (mutex) {
-            loader.processPageInRequests();
+            loader.processLoadRequests();
+
+            expirator.dispatch();
 
             // Dispatch ready consumers:
             SubscriptionContext consumer = trailingConsumers.getHead();
@@ -278,49 +290,55 @@
             }
 
             // Service any browsers:
-            /*
-             * SubscriptionContext browser = liveBrowsers.getHead(); while
-             * (browser != null) { SubscriptionContext nextBrowser =
-             * browser.getNext(); browser.trailingDispatch(); if (nextBrowser !=
-             * null) { browser = nextBrowser; } else { break; } }
-             */
-
-            // Process live consumers:
-            QueueElement next = liveCursor.getNext();
-            if (next != null && !next.isPagedOut()) {
-
-                // See if there are any interested consumers:
-                consumer = liveConsumers.getHead();
-                boolean interested = false;
-
-                find_consumer: while (consumer != null) {
-
-                    SubscriptionContext nextConsumer = consumer.getNext();
-                    switch (consumer.offer(next, liveCursor)) {
-                    case ACCEPTED:
-                        // Rotate list so this one is last next time:
-                        liveConsumers.rotate();
-                        interested = true;
-                        break find_consumer;
-                    case DECLINED:
-                        interested = true;
-                        break;
-                    case NO_MATCH:
-                        // Move on to the next consumer if this one didn't match
-                        consumer = consumer.getNext();
-                    }
-
-                    consumer = nextConsumer;
+            SubscriptionContext browser = readyBrowsers.getHead();
+            while (browser != null) {
+                SubscriptionContext nextBrowser = browser.getNext();
+                browser.trailingDispatch();
+                if (nextBrowser != null) {
+                    browser = nextBrowser;
+                } else {
+                    break;
                 }
+            }
 
-                // Advance the live cursor if this element was acquired
-                // or there was no interest:
-                if (!interested) {
-                    liveCursor.skip(next);
-                }
+            // Process shared consumers:
+            if (!sharedConsumers.isEmpty()) {
+                QueueElement next = sharedCursor.getNext();
+                if (next != null) {
+
+                    // See if there are any interested consumers:
+                    consumer = sharedConsumers.getHead();
+                    boolean interested = false;
+
+                    find_consumer: while (consumer != null) {
+
+                        // Get the nextConsumer now since the consumer may
+                        // remove itself
+                        // from the list when we offer to it:
+                        SubscriptionContext nextConsumer = consumer.getNext();
+                        switch (consumer.offer(next)) {
+                        case ACCEPTED:
+                            // Rotate list so this one is last next time:
+                            sharedConsumers.rotate();
+                            interested = true;
+                            break find_consumer;
+                        case DECLINED:
+                            interested = true;
+                            break;
+                        case NO_MATCH:
+                            // Move on to the next consumer if this one didn't
+                            // match
+                            consumer = consumer.getNext();
+                        }
+
+                        consumer = nextConsumer;
+                    }
 
-                // Request page in if the next element is paged out:
-                liveCursor.getNext();
+                    // Advance the shared cursor if no one was interested:
+                    if (!interested) {
+                        sharedCursor.skip(next);
+                    }
+                }
             }
             return isDispatchReady();
         }
@@ -333,8 +351,8 @@
         }
 
         if (started) {
-            // If we have live consumers, and an element ready for dispatch
-            if (!liveConsumers.isEmpty() && liveCursor.isReady()) {
+            // If we have shared consumers, and an element ready for dispatch
+            if (!sharedConsumers.isEmpty() && sharedCursor.isReady()) {
                 return true;
             }
 
@@ -345,7 +363,7 @@
 
             // Might consider allowing browsers to browse
             // while stopped:
-            if (!liveBrowsers.isEmpty()) {
+            if (!readyBrowsers.isEmpty()) {
                 return true;
             }
         }
@@ -355,6 +373,10 @@
             return true;
         }
 
+        if (expirator.needsDispatch()) {
+            return true;
+        }
+
         return false;
     }
 
@@ -393,33 +415,39 @@
         final Cursor cursor;
 
         SubscriptionContext(Subscription<V> target) {
-            this.cursor = new Cursor(target.toString());
             this.sub = target;
-            // TODO If this consumer doesn't have a selector
-            // and there are other consumers without a selector
-            // we can join the live cursor as well.
-            if (queue.isEmpty()) {
-                cursor.reset(liveCursor.sequence);
-            } else {
-                cursor.reset(queue.getHead().sequence);
-            }
+            this.cursor = new Cursor(target.toString(), !sub.isBrowser(), true);
+            cursor.setCursorReadyListener(new CursorReadyListener() {
+                public void onElementReady() {
+                    if (!isLinked()) {
+                        updateDispatchList();
+                    }
+                }
+            });
         }
 
         public void start() {
             if (!isStarted) {
                 isStarted = true;
-                // If we're behind the live cursor add to the trailing consumer
-                // list:
-                if (updateCursor()) {
-                    trailingConsumers.addLast(this);
-                    notifyReady();
+                if (sub.hasSelector() && !sub.isBrowser()) {
+                    activeSelectorSubs++;
+                }
+                if (queue.isEmpty()) {
+                    cursor.reset(sharedCursor.getCurrentSequeunce());
+                } else {
+                    cursor.reset(queue.getHead().sequence);
                 }
+
+                updateDispatchList();
             }
         }
 
         public void stop() {
             // If started remove this from any dispatch list
             if (isStarted) {
+                if (sub.hasSelector() && !sub.isBrowser()) {
+                    activeSelectorSubs--;
+                }
                 cursor.deactivate();
                 unlink();
                 isStarted = false;
@@ -436,45 +464,92 @@
          */
         public final void trailingDispatch() {
 
-            // Update cursor to see if we're still behind
-            if (updateCursor()) {
-                QueueElement next = cursor.getNext();
-
-                // If the next element is paged out,
-                // Add to the list of restoring consumers
-                if (next.pagedOutCount > 0) {
-                    unlink();
-                    restoringConsumers.addLast(this);
-                } else {
-                    offer(next, null);
+            if (checkJoinShared()) {
+                return;
+            }
+
+            QueueElement next = cursor.getNext();
+            // If the next element isn't yet available
+            // then unlink this subscription
+            if (next == null) {
+                unlink();
+            } else {
+                offer(next);
+            }
+        }
+
+        private final boolean checkJoinShared() {
+            if (list == sharedConsumers) {
+                return true;
+            }
+
+            // Browsers always operate independently:
+            if (sub.isBrowser()) {
+                return false;
+            }
+
+            // TODO Even if there are subscriptions with selectors present
+            // we can still join the shared cursor as long as there is at
+            // least one ready selector-less sub.
+            boolean join = false;
+            if (activeSelectorSubs == 0) {
+                join = true;
+            } else {
+                cursor.getNext();
+                if (queue.isEmpty() || cursor.compareTo(sharedCursor) >= 0) {
+                    join = true;
                 }
             }
+
+            // Have we joined the shared cursor? If so deactivate our cursor,
+            // and link to the
+            // sharedConsumers list:
+            if (join) {
+                cursor.deactivate();
+                unlink();
+                sharedConsumers.addLast(this);
+                return true;
+            }
+            return false;
         }
 
         /**
-         * Advances the liveCursor to the next available element. And checks
-         * whether or not this consumer is caught up to the live cursor
-         * 
-         * @return true if the cursor is behind the live cursor.
+         * Adds to subscription to the appropriate dispatch list:
          */
-        public final boolean updateCursor() {
-            // Advance to the next available element:
-            cursor.getNext();
+        private final void updateDispatchList() {
 
-            // Are we now live?
-            if (cursor.compareTo(liveCursor) >= 0 || queue.isEmpty()) {
-                cursor.deactivate();
-                unlink();
-                liveConsumers.addLast(this);
-                return false;
+            if (!checkJoinShared()) {
+                // Make sure our cursor is activated:
+                cursor.activate();
+                // If our next element is paged out
+                // Add to the restoring consumers list:
+                if (cursor.isReady()) {
+                    if (!sub.isBrowser()) {
+                        trailingConsumers.addLast(this);
+                    } else {
+                        readyBrowsers.addLast(this);
+                    }
+
+                    if (isDispatchReady()) {
+                        notifyReady();
+                    }
+                } else {
+                    // Unlink ouselves if our cursor isn't ready:
+                    unlink();
+                }
+            } else {
+                // Notify ready if we were the first in the list and
+                // we are ready for dispatch:
+                if (sharedConsumers.size() == 1 && isDispatchReady()) {
+                    notifyReady();
+                }
             }
-            return true;
         }
 
-        public final int offer(QueueElement qe, Cursor live) {
+        public final int offer(QueueElement qe) {
 
             // If we are already passed this element return NO_MATCH:
-            if (cursor.sequence > qe.sequence) {
+            if (cursor.getCurrentSequeunce() > qe.sequence) {
                 return NO_MATCH;
             }
 
@@ -484,20 +559,20 @@
                 return NO_MATCH;
             }
 
+            // Check for expiration:
+            if (qe.isExpired()) {
+                qe.acknowledge();
+                return ACCEPTED;
+            }
+
             // If the sub doesn't remove on dispatch set an ack listener:
-            Subscription.SubscriptionDeliveryCallback callback = sub.isRemoveOnDispatch() ? null : qe;
+            SubscriptionDeliveryCallback callback = sub.isRemoveOnDispatch() ? null : qe;
 
             // See if the sink has room:
             if (sub.offer(qe.elem, this, callback)) {
                 if (!sub.isBrowser()) {
                     qe.setAcquired(this);
 
-                    // If this came from the live cursor, update it
-                    // if we acquired the element:
-                    if (live != null) {
-                        live.skip(qe);
-                    }
-
                     // If remove on dispatch acknowledge now:
                     if (callback == null) {
                         qe.acknowledge();
@@ -544,8 +619,7 @@
             if (DEBUG)
                 System.out.println(this + " resumed.");
             synchronized (mutex) {
-                trailingConsumers.addLast(this);
-                notifyReady();
+                updateDispatchList();
             }
         }
 
@@ -554,12 +628,19 @@
         }
     }
 
+    public interface CursorReadyListener {
+        public void onElementReady();
+    }
+
     class Cursor implements Comparable<Cursor> {
 
+        private CursorReadyListener readyListener;
+
         private final String name;
         private boolean activated = false;;
 
-        // The next element for this cursor
+        // The next element for this cursor, always non null
+        // if activated, unless no element available:
         QueueElement current = null;
         // The current sequence number for this cursor,
         // used when inactive or pointing to an element
@@ -570,6 +651,9 @@
         // elements between first and last inclusive:
         QueueElement firstRef = null;
         QueueElement lastRef = null;
+        // This is set to the last block that for which
+        // we have requested a load:
+        long lastBlockRequest = -1;
 
         // Each cursor can optionally be memory limited
         // When the limiter is set the cursor is able to
@@ -578,9 +662,20 @@
         private final IFlowSizeLimiter<QueueElement> memoryLimiter;
         private final IFlowController<QueueElement> memoryController;
 
-        public Cursor(String name) {
+        // Indicates whether this cursor skips acquired elements
+        private final boolean skipAcquired;
+        // Indicates whether this cursor will page in elements
+        // 
+        private final boolean pageInElements;
+
+        public Cursor(String name, boolean skipAcquired, boolean pageInElements) {
             this.name = name;
-            if (DEFAULT_MEMORY_LIMIT < sizeLimiter.getCapacity()) {
+            this.skipAcquired = skipAcquired;
+            this.pageInElements = pageInElements;
+
+            // Set up a limiter if this cursor pages in elements, and memory
+            // limit is less than the queue size:
+            if (pageInElements && persistencePolicy.isPagingEnabled() && DEFAULT_MEMORY_LIMIT < sizeLimiter.getCapacity()) {
                 memoryLimiter = new SizeLimiter<QueueElement>(DEFAULT_MEMORY_LIMIT, DEFAULT_MEMORY_LIMIT) {
                     public int getElementSize(QueueElement qe) {
                         return qe.size;
@@ -607,17 +702,18 @@
          *            The element for which to check.
          * @return
          */
-        public boolean offer(QueueElement qe) {
+        public final boolean offer(QueueElement qe, ISourceController<?> controller) {
             if (activated && memoryLimiter != null) {
-                if (current == null) {
-                    getNext();
-                }
-                checkPageIn();
+                getNext();
                 if (lastRef != null) {
                     // Return true if we absorbed it:
                     if (qe.sequence <= lastRef.sequence && qe.sequence >= firstRef.sequence) {
                         return true;
                     }
+                    // If our last ref is close to this one reserve the element
+                    else if (qe.getPrevious() == lastRef) {
+                        return addCursorRef(qe, controller);
+                    }
                 }
                 return false;
             }
@@ -631,67 +727,82 @@
             current = null;
         }
 
-        public void activate() {
-            activated = true;
-            getNext();
-        }
-
-        public void deactivate() {
-            activated = false;
-
-            // Release all of our references:
-            long block = -1;
-            while (firstRef != null) {
-                block = firstRef.restoreBlock;
-                firstRef.releaseCursorRef(this);
-                memoryLimiter.remove(firstRef);
-
-                if (firstRef == lastRef) {
-                    loader.releaseBlock(this, block);
-                    firstRef = lastRef = null;
-                } else {
-                    firstRef = firstRef.getNext();
-                    if (firstRef.restoreBlock != block) {
-                        loader.releaseBlock(this, block);
+        public final void activate() {
+            if (!activated) {
+                activated = true;
+                getNext();
+            }
+        }
+
+        public final boolean isActivated() {
+            return activated;
+        }
+
+        public final void deactivate() {
+            if (activated) {
+                // Release all of our references:
+                while (firstRef != null) {
+                    firstRef.releaseHardRef(memoryController);
+
+                    // If we're passing into a new block release the old one:
+                    if (firstRef.isLastInBlock()) {
+                        System.out.println(this + " releasing block:" + firstRef.restoreBlock);
+                        loader.releaseBlock(this, firstRef.restoreBlock);
+                    }
+
+                    if (firstRef == lastRef) {
+                        firstRef = lastRef = null;
+                    } else {
+                        firstRef = firstRef.getNext();
                     }
                 }
-            }
 
-            // Let go of our current ref:
-            current = null;
+                // Release the last requested block:
+                if (persistencePolicy.isPageOutPlaceHolders()) {
+                    loader.releaseBlock(this, lastBlockRequest);
+                }
+
+                lastBlockRequest = -1;
+
+                // Let go of our current ref:
+                current = null;
+                activated = false;
+            }
         }
 
         /**
          * Makes sure elements are paged in
          */
-        private final void checkPageIn() {
+        private final void updatePagingRefs() {
             if (!activated)
                 return;
 
-            if (memoryLimiter != null) {
+            if (pageInElements && memoryLimiter != null) {
 
                 // Release memory references up to our sequence number
-                long block = -1;
                 while (firstRef != null && firstRef.getSequence() < sequence) {
-                    block = firstRef.restoreBlock;
-                    firstRef.releaseCursorRef(this);
-                    memoryLimiter.remove(firstRef);
+                    boolean lastInBlock = firstRef.isLastInBlock();
+                    QueueElement next = firstRef.getNext();
+                    firstRef.releaseHardRef(memoryController);
+
+                    // If we're passing into a new block release the old one:
+                    if (lastInBlock) {
+                        if (DEBUG)
+                            System.out.println(this + " releasing block:" + firstRef.restoreBlock);
 
+                        loader.releaseBlock(this, firstRef.restoreBlock);
+                    }
+
+                    // If we've reach our last ref null out held refs:
                     if (firstRef == lastRef) {
                         firstRef = lastRef = null;
                     } else {
-                        firstRef = firstRef.getNext();
-                        // If we're passing into a new block
-                        // release the old one:
-                        if (firstRef.restoreBlock != block) {
-                            loader.releaseBlock(this, block);
-                        }
+                        firstRef = next;
                     }
                 }
 
                 // Now add refs for as many elements as we can hold:
                 QueueElement next = null;
-
                 if (lastRef == null) {
                     next = current;
                 } else {
@@ -699,52 +810,66 @@
                 }
 
                 while (next != null && !memoryLimiter.getThrottled()) {
-                    if (next.isLoaded()) {
-                        next.addCursorRef(this);
-                        if (firstRef == null) {
-                            firstRef = next;
-                        }
-                        memoryLimiter.add(next);
-                        lastRef = next;
-                        next = lastRef.getNext();
-                    } else {
-                        // TODO track our currently requested block to avoid
-                        // calling this
-                        // repeatedly
-                        loader.loadBlock(this, next.restoreBlock);
+                    if (!addCursorRef(next, null)) {
                         break;
                     }
+                    next = lastRef.getNext();
                 }
             }
             // Otherwise we still need to ensure the block has been loaded:
             else if (current != null && !current.isLoaded()) {
-                loader.loadBlock(this, current.restoreBlock);
-            }
-        }
-
-        private final void updateSequence(final long newSequence) {
-            this.sequence = newSequence;
-            if (DEBUG && sequence > nextSequenceNumber) {
-                new Exception(this + "cursor overflow").printStackTrace();
+                if (lastBlockRequest != current.restoreBlock) {
+                    if (persistencePolicy.isPageOutPlaceHolders()) {
+                        loader.releaseBlock(this, lastBlockRequest);
+                    }
+                    loader.loadBlock(this, current.restoreBlock);
+                    lastBlockRequest = current.restoreBlock;
+                }
             }
         }
 
         /**
-         * @return true if their is a paged in, unacquired element that is ready
-         *         for dispatch
+         * Keeps the element paged in for this cursor accounting for it in the
+         * cursor's memory limiter. The provided controller is blocked if this
+         * overflows this cursor's limiter.
+         * 
+         * @param qe
+         *            The element to hold in memory.
+         * @param controller
+         *            The controller adding the element.
+         * @return false if the element isn't in memory.
          */
-        public final boolean isReady() {
-            if (!activated)
-                return false;
+        private final boolean addCursorRef(QueueElement qe, ISourceController<?> controller) {
+            // Make sure we have requested the block:
+            if (qe.restoreBlock != lastBlockRequest) {
+                lastBlockRequest = qe.restoreBlock;
+                if (DEBUG)
+                    System.out.println(this + " requesting block:" + lastBlockRequest);
+                loader.loadBlock(this, lastBlockRequest);
+            }
 
-            getNext();
-            // Possible when the queue is empty
-            if (current == null || current.isAcquired() || current.isPagedOut()) {
+            // If the next element isn't loaded then we can't yet
+            // reference it:
+            if (!qe.isLoaded()) {
                 return false;
             }
+
+            qe.addHardRef();
+            if (firstRef == null) {
+                firstRef = qe;
+            }
+            memoryController.add(qe, controller);
+            lastRef = qe;
             return true;
         }
 
+        private final void updateSequence(final long newSequence) {
+            this.sequence = newSequence;
+            if (DEBUG && sequence > nextSequenceNumber) {
+                new Exception(this + "cursor overflow").printStackTrace();
+            }
+        }
+
         /**
          * Sets the cursor to the next sequence number after the provided
          * element:
@@ -754,60 +879,95 @@
 
             if (next != null) {
                 updateSequence(next.sequence);
-                current = next;
+                if (activated) {
+                    current = next;
+                }
             } else {
                 current = null;
                 updateSequence(sequence + 1);
             }
+            updatePagingRefs();
         }
 
+        /**
+         * @return the next available element or null if one is not currently
+         *         available.
+         */
         public final QueueElement getNext() {
-            if (queue.isEmpty() || queue.getTail().sequence < sequence) {
-                current = null;
-                return null;
-            }
-
-            if (queue.getTail().sequence == sequence) {
-                current = queue.getTail();
-            }
 
-            // Get a pointer to the next element
-            if (current == null) {
-                current = queue.upper(sequence, true);
-                if (current == null) {
+            try {
+                if (queue.isEmpty() || queue.getTail().sequence < sequence) {
+                    current = null;
                     return null;
                 }
-            }
 
-            // Skip acquired elements:
-            while (current.isAcquired()) {
-                QueueElement last = current;
-                current = current.getNext();
-
-                // If the next element is null, increment our sequence
-                // and return:
-                if (current == null) {
-                    updateSequence(last.getSequence() + 1);
-                    return null;
+                if (queue.getTail().sequence == sequence) {
+                    current = queue.getTail();
                 }
 
-                // If we're paged out break, this isn't the
-                // next, but it means that we need to page
-                // in:
-                if (current.isPagedOut()) {
-                    break;
+                // Get a pointer to the next element
+                if (current == null || !current.isLinked()) {
+                    current = queue.upper(sequence, true);
+                    if (current == null) {
+                        return null;
+                    }
                 }
-            }
 
-            if (current.sequence < sequence) {
-                return null;
-            } else {
-                updateSequence(current.sequence);
+                // Skip removed elements (and acquired ones if requested)
+                while ((skipAcquired && current.isAcquired()) || current.isDeleted()) {
+                    QueueElement last = current;
+                    current = current.getNext();
+
+                    // If the next element is null, increment our sequence
+                    // and return:
+                    if (current == null) {
+                        updateSequence(last.getSequence() + 1);
+                        return null;
+                    }
+
+                    // Break if we're waiting to load an element
+                    if (!current.isLoaded()) {
+                        break;
+                    }
+
+                    // If we're paged out break, this isn't the
+                    // next, but it means that we need to page
+                    // in:
+                    if (current.isPagedOut() && pageInElements) {
+                        break;
+                    }
+                }
+
+                if (current.sequence < sequence) {
+                    return null;
+                } else {
+                    updateSequence(current.sequence);
+                }
+            } finally {
+                // Don't hold on to a current ref if we aren't activated:
+                if (!activated) {
+                    current = null;
+                }
+                updatePagingRefs();
+            }
+            if (current != null) {
+                // Don't return elements that are loaded:
+                if (!current.isLoaded()) {
+                    return null;
+                }
+
+                // Return null if the element isn't yet paged in:
+                if (pageInElements && current.isPagedOut()) {
+                    return null;
+                }
             }
-            checkPageIn();
             return current;
         }
 
+        public long getCurrentSequeunce() {
+            return sequence;
+        }
+
         public int compareTo(Cursor o) {
             if (o.sequence > sequence) {
                 return -1;
@@ -818,46 +978,117 @@
             }
         }
 
+        /**
+         * @return true if their is a paged in, unacquired element that is ready
+         *         for dispatch
+         */
+        public final boolean isReady() {
+            if (!activated)
+                return false;
+
+            if (getNext() == null) {
+                return false;
+            }
+            return true;
+        }
+
+        /**
+         * 
+         */
+        public void onElementsLoaded() {
+            if (readyListener != null && isReady()) {
+                System.out.println(this + " notifying ready");
+                readyListener.onElementReady();
+            }
+        }
+
+        /**
+         * @param cursorReadyListener
+         */
+        public void setCursorReadyListener(CursorReadyListener cursorReadyListener) {
+            readyListener = cursorReadyListener;
+        }
+
+        /**
+         * @return true if the cursor has passed the end of the queue.
+         */
+        public boolean atEnd() {
+            // TODO Auto-generated method stub
+            if (queue.isEmpty()) {
+                return true;
+            }
+
+            QueueElement tail = queue.getTail();
+            // Can't be at the end if the tail isn't loaded:
+            if (!tail.isLoaded()) {
+                return false;
+            }
+
+            if (tail.getSequence() < this.sequence) {
+                return true;
+            }
+
+            return false;
+        }
+
         public String toString() {
             return "Cursor: " + sequence + " [" + name + "]";
         }
     }
 
-    class QueueElement extends SortedLinkedListNode<QueueElement> implements Subscription.SubscriptionDeliveryCallback {
-
-        V elem;
+    class QueueElement extends SortedLinkedListNode<QueueElement> implements SubscriptionDeliveryCallback, SaveableQueueElement<V> {
 
-        SubscriptionContext owner;
         final long sequence;
-        int size = -1;
+        final long restoreBlock;
 
-        long restoreBlock;
-
-        // When a queue element is paged out, the first element
-        // in a range of paged out elements keeps track of the count
-        // and size of paged out elements.
-        int pagedOutCount = 0;
-        long pagedOutSize = 0;
+        V elem;
+        int size = -1;
+        long expiration = -1;
+        boolean redelivered = false;
 
-        // Cursors that have referenced this element,
         // When this drops to 0 we can page out the
         // element.
-        int cursorRefs = 0;
-        boolean acked = false;
+        int hardRefs = 0;
+
+        // When this drops to 0 we can unload the element
+        // providing it isn't in the load queue:
+        int softRefs = 0;
+
+        // Indicates whether this element is loaded or a placeholder:
+        boolean loaded = true;
+
+        // Indicates that we have requested a save for the element
+        boolean savePending = false;
+        // Indicates whether the element has been saved in the store.
+        boolean saved = false;
+
+        boolean deleted = false;
+        SubscriptionContext owner;
 
         public QueueElement(V elem, long sequence) {
             this.elem = elem;
 
             if (elem != null) {
                 size = sizeLimiter.getElementSize(elem);
+                expiration = expirationMapper.map(elem);
             }
             this.sequence = sequence;
             this.restoreBlock = sequence / RESTORE_BLOCK_SIZE;
         }
 
+        /**
+         * @return true if this element has been deleted:
+         */
+        public boolean isDeleted() {
+            return deleted;
+        }
+
         public QueueElement(RestoredElement<V> restored) throws Exception {
             this(restored.getElement(), restored.getSequenceNumber());
             this.size = restored.getElementSize();
+            this.expiration = restored.getExpiration();
+            saved = true;
+            savePending = false;
         }
 
         @Override
@@ -865,23 +1096,39 @@
             return sequence;
         }
 
-        public final void addCursorRef(Cursor cursor) {
-            cursorRefs++;
-            if (elem == null) {
+        public final void addHardRef() {
+            hardRefs++;
+            // Page in the element (providing it wasn't removed):
+            if (elem == null && !deleted) {
                 // If this is the first request for this
                 // element request a load:
-                if (cursorRefs == 1) {
+                if (hardRefs == 1) {
                     loader.pageIn(this);
                 }
             }
         }
 
-        public final void releaseCursorRef(Cursor cursor) {
-            cursorRefs--;
-            if (cursorRefs == 0) {
-                // TODO need a controller:
-                unload(cursor.memoryController);
+        public final void releaseHardRef(IFlowController<QueueElement> controller) {
+            hardRefs--;
+            if (hardRefs == 0) {
+                unload(controller);
+            }
+            if (controller != null) {
+                controller.elementDispatched(this);
             }
+            assert hardRefs >= 0;
+        }
+
+        public final void addSoftRef() {
+            softRefs++;
+        }
+
+        public final void releaseSoftRef() {
+            softRefs--;
+            if (softRefs == 0) {
+                unload(null);
+            }
+            assert softRefs >= 0;
         }
 
         public final void setAcquired(SubscriptionContext owner) {
@@ -890,86 +1137,104 @@
 
         public final void acknowledge() {
             synchronized (mutex) {
-                acked = true;
+                delete();
+            }
+        }
+
+        public final void delete() {
+            if (!deleted) {
+                deleted = true;
                 owner = null;
                 totalQueueCount--;
+                if (isExpirable()) {
+                    expirator.elementRemoved(this);
+                }
                 sizeController.elementDispatched(elem);
-                if (store.isElemPersistent(elem) || store.isFromStore(elem)) {
+                if (saved) {
                     store.deleteQueueElement(queueDescriptor, elem);
                 }
+                elem = null;
                 unload(null);
             }
         }
 
         public final void unacquire(ISourceController<?> source) {
-            // TODO reenqueue and update cursors back to this position.
-            // If there are subscriptions with selectors this could get
-            // tricky to avoid reevaluating already evaluated selectors.
-            throw new UnsupportedOperationException();
+            owner = null;
+            if (isExpired()) {
+                acknowledge();
+            } else {
+                // TODO reset all cursors beyond this sequence number
+                // back to this element
+                throw new UnsupportedOperationException("Not yet implemented");
+            }
         }
 
         /**
          * Attempts to unlink this element from the queue
          */
         public final void unload(ISourceController<?> controller) {
-            // Unlink this element from the queue. Don't unlink
-            // if the element is acquired or if it is in the load
-            // queue.
-            if (cursorRefs > 0 || loader.inLoadQueue(this) || owner != null) {
+
+            // Can't unlink if there is a cursor ref, the cursor
+            // needs this element to decrement it's limiter space
+            if (hardRefs > 0) {
+                return;
+            }
+
+            // If the element didn't require persistence on enqueue, then
+            // we'll need to save it now before paging it out.
+            // Note that we don't page out the element if it has an owner
+            // because we need the element when we issue the delete.
+            if (owner == null && elem != null && !persistencePolicy.isPersistent(elem)) {
+                save(controller, true);
+                if (DEBUG)
+                    System.out.println("Paged out element: " + this);
+                elem = null;
+            }
+
+            // If save is pending don't unload until the save has completed
+            if (savePending) {
                 return;
             }
 
             QueueElement next = getNext();
             QueueElement prev = getPrevious();
 
-            // If acked unlink this element from the queue, and link
-            // together adjacent paged out entries:
-            if (acked) {
-                unlink();
-                // If both next and previous entries are unloaded,
-                // then collapse them:
-                if (next != null && prev != null && next.pagedOutCount > 0 && prev.pagedOutCount > 0) {
-                    prev.pagedOutCount += next.pagedOutCount;
-                    prev.pagedOutSize += next.pagedOutSize;
-                    next.unlink();
-                }
-            }
-            // Otherwise page out this element
-            else if (elem != null) {
-                // If the element is not persistent then we'll need to request a
-                // save:
-                if (!store.isFromStore(elem) && !store.isElemPersistent(elem)) {
-                    try {
-                        store.persistQueueElement(queueDescriptor, controller, elem, sequence, false);
-                    } catch (Exception e) {
-                        // TODO Auto-generated catch block
-                        e.printStackTrace();
+            // See if we can unload this element:
+            // Don't unload the element if it is:
+            // -Has an owner (we keep the element in memory so we don't
+            // forget about the owner).
+            // -If there are soft references to it
+            // -Or it is in the load queue
+            if (owner == null && softRefs == 0 && !loader.inLoadQueue(this)) {
+                // If deleted unlink this element from the queue, and link
+                // together adjacent paged out entries:
+                if (deleted) {
+                    unlink();
+                    // If both next and previous entries are unloaded,
+                    // then collapse them:
+                    if (next != null && prev != null && !next.isLoaded() && !prev.isLoaded()) {
+                        next.unlink();
                     }
-                }
+                } else {
 
-                pagedOutCount = 1;
-                pagedOutSize = size;
-                elem = null;
+                    loaded = false;
 
-                // If the next element is unloaded
-                // replace it with this
-                if (next != null && next.pagedOutCount > 0) {
-                    pagedOutCount += next.pagedOutCount;
-                    pagedOutSize += next.pagedOutSize;
-                    next.unlink();
-                }
+                    // If the next element is unloaded
+                    // replace it with this
+                    if (next != null && !next.isLoaded()) {
+                        next.unlink();
+                    }
 
-                // If the previous elem is paged out unlink this
-                // entry:
-                if (prev != null && prev.pagedOutCount > 0) {
-                    prev.pagedOutCount += pagedOutCount;
-                    prev.pagedOutSize += pagedOutSize;
-                    unlink();
+                    // If the previous elem is unloaded unlink this
+                    // entry:
+                    if (prev != null && !prev.isLoaded()) {
+                        unlink();
+                    }
                 }
             }
 
             if (DEBUG)
-                System.out.println("Paged out element: " + this);
+                System.out.println("Unloaded element: " + this);
 
         }
 
@@ -978,76 +1243,361 @@
          * 
          * @param qe
          *            The paged in element to relink.
+         * @throws Exception
+         *             If there was an error creating the loaded element:
          */
-        public final QueueElement loadAfter(QueueElement qe, long nextSequence) {
-            QueueElement ret = qe;
-            // See if we have a pointer to a paged out element:
-            if (sequence == qe.sequence) {
-                // Already paged in? Shouldn't be.
-                if (!isPagedOut()) {
-                    // throw new
-                    // IllegalStateException("Can't page in an already paged in element");
-                    if (DEBUG) {
-                        System.out.println("Paged in already paged in element: " + this);
-                    }
-                } else {
-                    // Otherwise set this element to the paged in one
-                    // and add a new QueueElement to hold any additional
-                    // paged out elements:
-                    elem = qe.elem;
-                    size = qe.size;
-                    pagedOutCount--;
-                    if (pagedOutCount > 0) {
-                        if (nextSequence == -1) {
-                            throw new IllegalStateException("Shouldn't have paged out elements at the end of the queue");
-                        }
-                        qe = new QueueElement(null, nextSequence);
-                        qe.pagedOutCount = pagedOutCount;
-                        qe.pagedOutSize = pagedOutSize - size;
-                        pagedOutCount = 0;
-                        pagedOutSize = 0;
-                        try {
-                            this.linkAfter(qe);
-                        } catch (Exception e) {
-                            e.printStackTrace();
+        public final QueueElement loadAfter(RestoredElement<V> re) throws Exception {
+
+            QueueElement ret = null;
+
+            // See if this element represents the one being loaded:
+            if (sequence == re.getSequenceNumber()) {
+                ret = this;
+                // If this isn't yet loaded
+                if (!isLoaded()) {
+
+                    loaded = true;
+                    // Add a place holder to the next element if it's not
+                    // already
+                    // loaded:
+                    if (re.getNextSequenceNumber() != -1) {
+                        // Otherwise if our next pointer doesn't match the
+                        // next restored number:
+                        QueueElement next = getNext();
+                        if (next == null || next.sequence != re.getNextSequenceNumber()) {
+                            next = new QueueElement(null, re.getNextSequenceNumber());
+                            next.loaded = false;
+                            this.linkAfter(next);
                         }
                     }
+                    this.size = re.getElementSize();
                 }
-                ret = this;
+
+                // If we're paged out set our elem to the restored one:
+                if (isPagedOut() && !deleted) {
+                    this.elem = re.getElement();
+                }
+                saved = true;
+                savePending = false;
+
             } else {
+                ret = new QueueElement(re);
                 // Otherwise simply link this element into the list:
-                queue.add(qe);
-                // Decrement pagedOutCount counter of previous element:
-                if (qe.prev != null && qe.prev.pagedOutCount > 0) {
-                    if (qe.prev.pagedOutCount > 1) {
-                        throw new IllegalStateException("Skipped paged in element");
-                    }
-                    pagedOutCount = qe.pagedOutCount - 1;
-                    qe.prev.pagedOutCount = 0;
-                    qe.prev.pagedOutSize = 0;
-                }
+                queue.add(ret);
             }
 
             if (DEBUG)
-                System.out.println("Paged in element: " + this);
-
+                System.out.println("Loaded element: " + ret);
             return ret;
         }
 
+        public final boolean isFirstInBlock() {
+            if (isHeadNode()) {
+                return true;
+            } else {
+                return prev.restoreBlock != restoreBlock;
+            }
+        }
+
+        public final boolean isLastInBlock() {
+            if (isTailNode()) {
+                return nextSequenceNumber / RESTORE_BLOCK_SIZE != restoreBlock;
+            } else {
+                return next.restoreBlock != restoreBlock;
+            }
+        }
+
         public final boolean isPagedOut() {
-            return elem == null;
+            return elem == null || !isLoaded();
         }
 
         public final boolean isLoaded() {
-            return pagedOutCount == 0;
+            return loaded;
         }
 
         public final boolean isAcquired() {
-            return owner != null;
+            return owner != null || deleted;
+        }
+
+        public final long getExpiration() {
+            return expiration;
+        }
+
+        public boolean isExpirable() {
+            return expiration > 0;
+        }
+
+        public final boolean isExpired() {
+            return expiration > 0 && System.currentTimeMillis() > expiration;
+        }
+
+        public final void save(ISourceController<?> controller, boolean delayable) {
+            if (!saved) {
+                store.persistQueueElement(this, controller, delayable);
+                saved = true;
+
+                // If paging is enabled we can't unload the element until it
+                // is saved, otherwise there is no guarantee that it will be
+                // in the store on a subsequent load requests because the
+                // save is done asynchronously.
+                if (persistencePolicy.isPagingEnabled()) {
+                    savePending = true;
+                }
+            }
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore.SaveableQueueElement#getElement
+         * ()
+         */
+        public final V getElement() {
+            return elem;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @seeorg.apache.activemq.queue.QueueStore.SaveableQueueElement#
+         * getSequenceNumber()
+         */
+        public final long getSequenceNumber() {
+            return sequence;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore.SaveableQueueElement#notifySave
+         * ()
+         */
+        public void notifySave() {
+            synchronized (mutex) {
+                // Unload if we haven't already:
+                if (isLinked()) {
+                    savePending = false;
+                    unload(null);
+                }
+            }
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore.SaveableQueueElement#requestNotify
+         * ()
+         */
+        public boolean requestSaveNotify() {
+            return savePending;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @seeorg.apache.activemq.queue.QueueStore.SaveableQueueElement#
+         * getQueueDescriptor()
+         */
+        public QueueDescriptor getQueueDescriptor() {
+            return queueDescriptor;
         }
 
         public String toString() {
-            return "QueueElement " + sequence + " pagedOutCount: " + pagedOutCount + " owner: " + owner;
+            return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " owner: " + owner;
+        }
+
+    }
+
+    private class Expirator {
+
+        private final Cursor cursor = new Cursor("Expirator", false, false);
+        // Number of expirable elements in the queue:
+        private int count = 0;
+
+        private boolean loaded = false;
+        private long recoverySequence;
+        private long lastRecoverdSequence;
+
+        private static final int MAX_CACHE_SIZE = 500;
+        private long uncachedMin = Long.MAX_VALUE;
+        TreeMap<Long, HashSet<QueueElement>> expirationCache = new TreeMap<Long, HashSet<QueueElement>>();
+        private int cacheSize = 0;
+
+        public final boolean needsDispatch() {
+            // If we have expiration candidates or are scanning the
+            // queue request dispatch:
+            return hasExpirationCandidates() || cursor.isReady();
+        }
+
+        public void start() {
+            if (getEnqueuedCount() == 0) {
+                loaded = true;
+            } else {
+                // Otherwise open a cursor and scan the queue up to
+                // the current sequence number checking for expirable
+                // elements:
+                recoverySequence = nextSequenceNumber;
+                cursor.reset(queue.getHead().sequence);
+                cursor.activate();
+                cursor.setCursorReadyListener(new CursorReadyListener() {
+                    public void onElementReady() {
+                        synchronized (mutex) {
+                            notifyReady();
+                        }
+                    }
+                });
+            }
+        }
+
+        public void dispatch() {
+
+            long now = -1;
+            // If their are uncached elements in the queue that are ready for
+            // expiration
+            // then scan the queue:
+            if (!cursor.isActivated() && uncachedMin < (now = System.currentTimeMillis())) {
+                uncachedMin = Long.MAX_VALUE;
+                cursor.reset(0);
+            }
+
+            // Scan the queue looking for expirables:
+            if (cursor.isReady()) {
+                QueueElement qe = cursor.getNext();
+                while (qe != null) {
+                    if (!loaded) {
+                        if (qe.sequence < recoverySequence) {
+                            lastRecoverdSequence = qe.sequence;
+                            elementAdded(qe);
+                        }
+                        cursor.skip(qe);
+                        qe = cursor.getNext();
+                    } else {
+                        if (qe.isExpired()) {
+                            qe.acknowledge();
+                        } else {
+                            addToCache(qe);
+                        }
+                    }
+                }
+
+                // Finished loading:
+                if (!loaded && cursor.getCurrentSequeunce() >= recoverySequence) {
+                    System.out.println(this + " Queue Load Complete");
+                    loaded = true;
+                    cursor.deactivate();
+                } else if (cursor.atEnd()) {
+                    cursor.deactivate();
+                }
+            }
+
+            if (now == -1) {
+                now = System.currentTimeMillis();
+            }
+
+            // 
+            while (!expirationCache.isEmpty()) {
+                Entry<Long, HashSet<QueueElement>> first = expirationCache.firstEntry();
+                if (first.getKey() < now) {
+                    for (QueueElement qe : first.getValue()) {
+                        qe.releaseSoftRef();
+                        qe.acknowledge();
+                    }
+                }
+            }
+        }
+
+        public void elementAdded(QueueElement qe) {
+            if (qe.isExpirable() && !qe.isDeleted()) {
+                count++;
+                if (qe.isExpired()) {
+                    qe.acknowledge();
+                } else {
+                    addToCache(qe);
+                }
+            }
+        }
+
+        private void addToCache(QueueElement qe) {
+            // See if we should cache it, evicting entries if possible
+            if (cacheSize >= MAX_CACHE_SIZE) {
+                Entry<Long, HashSet<QueueElement>> last = expirationCache.lastEntry();
+                if (last.getKey() <= qe.expiration) {
+                    // Keep track of the minimum uncached value:
+                    if (qe.expiration < uncachedMin) {
+                        uncachedMin = qe.expiration;
+                    }
+                    return;
+                }
+
+                // Evict the entry:
+                Iterator<QueueElement> i = last.getValue().iterator();
+                removeFromCache(i.next());
+
+                if (last.getKey() <= uncachedMin) {
+                    // Keep track of the minimum uncached value:
+                    uncachedMin = last.getKey();
+                }
+            }
+
+            HashSet<QueueElement> entry = new HashSet<QueueElement>();
+            entry.add(qe);
+            qe.addSoftRef();
+            cacheSize++;
+            HashSet<QueueElement> old = expirationCache.put(qe.expiration, entry);
+            if (old != null) {
+                old.add(qe);
+                expirationCache.put(qe.expiration, old);
+            }
+        }
+
+        private final void removeFromCache(QueueElement qe) {
+            HashSet<QueueElement> last = expirationCache.get(qe.expiration);
+            if (last != null && last.remove(qe.getSequenceNumber())) {
+                cacheSize--;
+                qe.releaseSoftRef();
+                if (last.isEmpty()) {
+                    expirationCache.remove(qe.sequence);
+                }
+            }
+        }
+
+        public void elementRemoved(QueueElement qe) {
+            // While loading, ignore elements that we haven't been seen yet.
+            if (!loaded && qe.sequence < recoverySequence && qe.sequence > lastRecoverdSequence) {
+                return;
+            }
+
+            if (qe.isExpirable()) {
+                count--;
+                removeFromCache(qe);
+                assert count > 0;
+            }
+        }
+
+        public boolean hasExpirationCandidates() {
+            return !loaded || hasExpirables();
+        }
+
+        public boolean hasExpirables() {
+            if (count == 0) {
+                return false;
+            } else {
+                long now = System.currentTimeMillis();
+                if (now > uncachedMin) {
+                    return true;
+                } else if (!expirationCache.isEmpty()) {
+                    return now > expirationCache.firstKey();
+                }
+
+                return false;
+            }
+        }
+
+        public String toString() {
+            return "Expirator for " + SharedQueue.this + " expirable " + count;
         }
     }
 
@@ -1068,6 +1618,7 @@
 
         private LinkedList<QueueStore.RestoredElement<V>> fromDatabase = new LinkedList<QueueStore.RestoredElement<V>>();
         private final HashMap<Long, HashSet<Cursor>> requestedBlocks = new HashMap<Long, HashSet<Cursor>>();
+        private final HashSet<Cursor> pagingCursors = new HashSet<Cursor>();
 
         public boolean inLoadQueue(QueueElement queueElement) {
             return requestedBlocks.containsKey(queueElement.restoreBlock);
@@ -1091,15 +1642,16 @@
          */
         public final void elementAdded(QueueElement qe, ISourceController<V> source) {
 
-            if (useMemoryLimiter) {
+            if (persistencePolicy.isPagingEnabled()) {
 
-                // Check with the live cursor to see if it is willing to
+                // Check with the shared cursor to see if it is willing to
                 // absorb the element. If so that's good enough.
-                if (liveCursor.offer(qe)) {
+                if (sharedCursor.offer(qe, source)) {
                     return;
                 }
 
-                // Find a cursor willing to accept the element:
+                // Otherwise check with any other open cursor to see if
+                // it can take the element:
                 HashSet<Cursor> active = requestedBlocks.get(qe.sequence);
 
                 // If there are none, unload the element:
@@ -1112,12 +1664,12 @@
                 // element:
                 boolean accepted = false;
                 for (Cursor cursor : active) {
-                    // Already checked the live cursor above:
-                    if (cursor == liveCursor) {
+                    // Already checked the shared cursor above:
+                    if (cursor == sharedCursor) {
                         continue;
                     }
 
-                    if (cursor.offer(qe)) {
+                    if (cursor.offer(qe, source)) {
                         accepted = true;
                     }
                 }
@@ -1154,32 +1706,35 @@
                 if (DEBUG)
                     System.out.println(cursor + " requesting restoreBlock:" + block + " from " + firstSequence + " to " + maxSequence + " max: " + maxCount + " queueMax: " + nextSequenceNumber);
 
-                // If we are memory limited only pull in queue records, don't
-                // bring in the payload.
+                // If paging is enabled only pull in queue records, don't bring
+                // in the payload.
                 // Each active cursor will have to pull in messages based on
                 // available memory.
-                store.restoreQueueElements(queueDescriptor, useMemoryLimiter, firstSequence, maxSequence, maxCount, this);
+                store.restoreQueueElements(queueDescriptor, persistencePolicy.isPagingEnabled(), firstSequence, maxSequence, maxCount, this);
             }
             cursors.add(cursor);
         }
 
         public void releaseBlock(Cursor cursor, long block) {
+            // Don't do anything if we don't page out placeholders
+            if (!persistencePolicy.isPageOutPlaceHolders()) {
+                return;
+            }
             HashSet<Cursor> cursors = requestedBlocks.get(block);
             if (cursors == null) {
                 if (true || DEBUG)
-                    System.out.println(this + " removeBlockInterest, no consumers " + cursor);
+                    System.out.println(this + " removeBlockInterest " + block + ", no cursors" + cursor);
             } else {
                 if (cursors.remove(cursor)) {
                     if (cursors.isEmpty()) {
                         requestedBlocks.remove(block);
-                        // If this is the last cursor active in this block page
-                        // out the block:
-                        if (useMemoryLimiter) {
+                        // If this is the last cursor active in this block
+                        // unload the block:
+                        if (persistencePolicy.isPagingEnabled()) {
                             QueueElement qe = queue.upper(RESTORE_BLOCK_SIZE * block, true);
                             while (qe != null && qe.restoreBlock == block) {
                                 QueueElement next = qe.getNext();
-                                // TODO use the cursor's flow controller:
-                                qe.unload(null);
+                                qe.unload(cursor.memoryController);
                                 qe = next;
                             }
                         }
@@ -1189,15 +1744,13 @@
                         System.out.println(this + " removeBlockInterest, no cursor " + cursor);
                 }
             }
-
         }
 
         /**
-         * Returns loaded messages or null if none have been loaded.
+         * Adds elements loaded from the store to the queue.
          * 
-         * @throws IOException
          */
-        final void processPageInRequests() {
+        final void processLoadRequests() {
             LinkedList<RestoredElement<V>> restoredElems = null;
             synchronized (fromDatabase) {
                 if (fromDatabase.isEmpty()) {
@@ -1209,23 +1762,26 @@
 
             // Process restored messages:
             if (restoredElems != null) {
-                // boolean trailingRestore = false;
-                for (QueueStore.RestoredElement<V> restored : restoredElems) {
+                for (RestoredElement<V> restored : restoredElems) {
                     try {
-                        // V delivery = restored.getElement();
-                        // TODO Might be better to change the loadAfter
-                        // signature to directly take the RestoredElement:
-                        // This would avoid creating an QueueElement that might
-                        // not be needed if the element is already paged it:
-                        QueueElement qe = new QueueElement(restored);
-                        QueueElement lower = queue.lower(qe.sequence, true);
-                        qe = lower.loadAfter(qe, restored.getNextSequenceNumber());
-                        loader.elementLoaded(qe);
-
-                        // If we are memory limited remove the request block
-                        // entry
-                        // as soon as we load the block:
-                        if (!useMemoryLimiter) {
+
+                        QueueElement qe = queue.lower(restored.getSequenceNumber(), true);
+
+                        // If we don't have a paged out place holder for this
+                        // element
+                        // it must have been deleted:
+                        if (qe == null) {
+                            System.out.println("Loaded non-existent element: " + restored.getSequenceNumber());
+                            continue;
+                        }
+
+                        qe = qe.loadAfter(restored);
+                        elementLoaded(qe);
+
+                        // If we don't page out place holders we needn't track
+                        // block
+                        // interest once the block is loaded.
+                        if (!persistencePolicy.isPageOutPlaceHolders()) {
                             requestedBlocks.remove(qe.restoreBlock);
                         }
 
@@ -1239,9 +1795,10 @@
                 }
 
                 // Add restoring consumers back to trailing consumers:
-                if (!restoringConsumers.isEmpty()) {
-                    trailingConsumers.addFirst(restoringConsumers);
-                }
+                for (Cursor paging : pagingCursors)
+                    paging.onElementsLoaded();
+
+                pagingCursors.clear();
             }
         }
 
@@ -1273,6 +1830,21 @@
         this.store = store;
     }
 
+    public void setPersistencePolicy(PersistencePolicy<V> persistencePolicy) {
+        this.persistencePolicy = persistencePolicy;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.queue.IQueue#setExpirationMapper(org.apache.activemq
+     * .util.Mapper)
+     */
+    public void setExpirationMapper(Mapper<Long, V> expirationMapper) {
+        this.expirationMapper = expirationMapper;
+    }
+
     public String toString() {
         return "SharedQueue: " + getResourceName();
     }
@@ -1289,5 +1861,4 @@
     public IFlowController<V> getFlowControler() {
         return sizeController;
     }
-
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java Mon May 11 16:25:10 2009
@@ -134,7 +134,6 @@
         super.onFlowOpened(sinkController);
     }
 
-
     public int getEnqueuedCount() {
         synchronized (mutex) {
             return store.size();
@@ -146,7 +145,7 @@
             return limiter.getSize();
         }
     }
-    
+
     public synchronized void start() {
         if (!started) {
             started = true;
@@ -161,11 +160,23 @@
     }
 
     public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
-        // TODO - this queue is not persistent, so we can ignore this.
+        // this queue is not persistent, so we can ignore this.
     }
-    
+
     public void setStore(QueueStore<K, V> store) {
-        //No-op
+        // No-op
+    }
+
+    public void setPersistencePolicy(PersistencePolicy<V> persistencePolicy) {
+        // this queue is not persistent, so we can ignore this.
+    }
+
+    
+    /* (non-Javadoc)
+     * @see org.apache.activemq.queue.IQueue#setExpirationMapper(org.apache.activemq.util.Mapper)
+     */
+    public void setExpirationMapper(Mapper<Long, V> expirationMapper) {
+        //not implemented.
     }
 
     protected final ISinkController<V> getSinkController(V elem, ISourceController<?> source) {
@@ -273,7 +284,7 @@
     public QueueStore.QueueDescriptor getDescriptor() {
         return queueDescriptor;
     }
-    
+
     public boolean pollingDispatch() {
 
         // System.out.println("polling dispatch");
@@ -326,7 +337,7 @@
             synchronized (mutex) {
                 if (accepted) {
                     subNode.cursorNext();
-                    if (subNode.subscription.isPreAcquired() && subNode.subscription.isRemoveOnDispatch()) {
+                    if (/*subNode.subscription.isPreAcquired() &&*/ subNode.subscription.isRemoveOnDispatch()) {
                         StoreNode<K, V> removed = store.remove(storeNode.getKey());
                         assert removed != null : "Since the node was aquired.. it should not have been removed by anyone else.";
                         sinkController.elementDispatched(storeNode.getValue());
@@ -406,23 +417,23 @@
                     continue;
                 }
 
-                if (subscription.isPreAcquired()) {
+                //if (subscription.isPreAcquired()) {
                     if (elemNode.acquire(subscription)) {
                         return elemNode;
                     } else {
                         cursor.next();
                         continue;
                     }
-                }
+                //}
             }
             cursor = null;
             return null;
         }
 
         public void cursorUnPeek(StoreNode<K, V> node) {
-            if (subscription.isPreAcquired()) {
+            //if (subscription.isPreAcquired()) {
                 node.unacquire();
-            }
+            //}
         }
 
         @Override
@@ -447,7 +458,7 @@
     public FlowController<V> getFlowControler() {
         return this.sinkController;
     }
-    
+
     public interface StoreNode<K, V> {
 
         public boolean acquire(Subscription<V> ownerId);
@@ -466,9 +477,8 @@
         public void setNext(StoreNode<K, V> node);
 
     }
-    
-    private class TreeMemoryStore
-    {
+
+    private class TreeMemoryStore {
         AtomicLong counter = new AtomicLong();
 
         class MemoryStoreNode implements StoreNode<K, V> {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java Mon May 11 16:25:10 2009
@@ -35,7 +35,7 @@
          * the element and that it should be placed back on the queue. 
          * 
          * The provided source controller will be blocked if there 
-         * is not enough space available on the sub queue to
+         * is not enough space available on the queue to
          * reenqueue the element.
          * 
          * It is illegal to call this method after a prior call to 
@@ -59,8 +59,6 @@
      */
     public boolean isBrowser();
 
-    public boolean isPreAcquired();
-
     /**
      * Returns true if the Subscription has a selector. If true
      * is returned the {@link #matches(Object)} will be called

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java Mon May 11 16:25:10 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
@@ -80,6 +81,8 @@
         return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", MessageBroker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
     }
 
+    protected int consumerStartDelay = 0;
+
     protected void startServices() throws Exception {
         dispatcher = createDispatcher();
         dispatcher.start();
@@ -120,7 +123,6 @@
         stopServices();
     }
 
-    
     public void testSharedQueue_1_1_1() throws Exception {
         startServices();
         try {
@@ -134,6 +136,19 @@
         }
     }
 
+    public void testSharedQueue_1_1_1_Restore() throws Exception {
+        startServices();
+        try {
+            createQueues(1);
+            createProducers(1);
+            createConsumers(1);
+            doTest();
+
+        } finally {
+            cleanup();
+        }
+    }
+
     public void testSharedQueue_10_10_10() throws Exception {
         startServices();
         try {
@@ -146,7 +161,7 @@
             cleanup();
         }
     }
-    
+
     public void testSharedQueue_10_1_10() throws Exception {
         startServices();
         try {
@@ -159,8 +174,7 @@
             cleanup();
         }
     }
-    
-    
+
     public void testSharedQueue_10_1_1() throws Exception {
         startServices();
         try {
@@ -173,7 +187,7 @@
             cleanup();
         }
     }
-    
+
     public void testSharedQueue_1_1_10() throws Exception {
         startServices();
         try {
@@ -196,11 +210,26 @@
                 queue.start();
             }
     
-            // Start consumers:
-            for (Consumer consumer : consumers) {
-                consumer.start();
+            Runnable startConsumers = new Runnable()
+            {
+                public void run()
+                {
+                    // Start consumers:
+                    for (Consumer consumer : consumers) {
+                        consumer.start();
+                    }
+                }
+            };
+            
+            if(consumerStartDelay > 0)
+            {
+                dispatcher.schedule(startConsumers, consumerStartDelay, TimeUnit.SECONDS);
             }
-    
+            else
+            {
+                startConsumers.run();
+            }
+            
             // Start producers:
             for (Producer producer : producers) {
                 producer.start();
@@ -370,9 +399,8 @@
         public void onFlowUnblocked(ISinkController<OpenWireMessageDelivery> controller) {
             dispatchContext.requestDispatch();
         }
-        
-        public String toString()
-        {
+
+        public String toString() {
             return name + " on " + targetQueue.getResourceName();
         }
     }
@@ -458,9 +486,8 @@
         public boolean match(MessageDelivery message) {
             return true;
         }
-        
-        public String toString()
-        {
+
+        public String toString() {
             return name + " on " + sourceQueue.getResourceName();
         }
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Mon May 11 16:25:10 2009
@@ -9,6 +9,7 @@
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.PartitionedQueue;
+import org.apache.activemq.queue.PersistencePolicy;
 import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.SharedPriorityQueue;
 import org.apache.activemq.queue.SharedQueue;
@@ -25,7 +26,8 @@
     private Mapper<Integer, Message> partitionMapper;
     private Mapper<Long, Message> keyExtractor;
     private final MockStoreAdapater store = new MockStoreAdapater();
-
+    private static final PersistencePolicy<Message> NO_PERSISTENCE = new PersistencePolicy.NON_PERSISTENT_POLICY<Message>();
+    
     private IQueue<Long, Message> createQueue() {
 
         if (partitionMapper != null) {
@@ -38,6 +40,7 @@
             queue.setPartitionMapper(partitionMapper);
             queue.setResourceName(destination.getName().toString());
             queue.setStore(store);
+            queue.setPersistencePolicy(NO_PERSISTENCE);
             queue.initialize(0, 0, 0, 0);
             return queue;
         } else {
@@ -54,6 +57,7 @@
             queue.setAutoRelease(true);
             queue.setDispatcher(broker.getDispatcher());
             queue.setStore(store);
+            queue.setPersistencePolicy(NO_PERSISTENCE);
             queue.initialize(0, 0, 0, 0);
             return queue;
         } else {
@@ -63,6 +67,7 @@
             queue.setAutoRelease(true);
             queue.setDispatcher(broker.getDispatcher());
             queue.setStore(store);
+            queue.setPersistencePolicy(NO_PERSISTENCE);
             queue.initialize(0, 0, 0, 0);
             return queue;
         }
@@ -78,9 +83,6 @@
 
     public final void addConsumer(final DeliveryTarget dt) {
         Subscription<Message> sub = new Subscription<Message>() {
-            public boolean isPreAcquired() {
-                return true;
-            }
             
             public boolean isBrowser() {
                 return false;
@@ -181,15 +183,11 @@
 
         }
 
-        public final boolean isElemPersistent(Message elem) {
-            return false;
-        }
-
         public final boolean isFromStore(Message elem) {
             return false;
         }
 
-        public final void persistQueueElement(QueueStore.QueueDescriptor descriptor, ISourceController<?> controller, Message elem, long sequence, boolean delayable) throws Exception {
+        public final void persistQueueElement(SaveableQueueElement<Message> elem, ISourceController<?> controller, boolean delayable) {
             // Noop;
         }
 



Mime
View raw message