activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r770290 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/store/ main/java/org/apache/activemq/queue/ test/java/org/apache/activemq/broker/ test/java/org/apache/activemq/flow/
Date Thu, 30 Apr 2009 15:38:34 GMT
Author: chirino
Date: Thu Apr 30 15:38:34 2009
New Revision: 770290

URL: http://svn.apache.org/viewvc?rev=770290&view=rev
Log:
Applying AMQ-2236 patch.  Thx

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=770290&r1=770289&r2=770290&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Thu Apr 30 15:38:34 2009
@@ -222,9 +222,9 @@
         elem.persist(descriptor, controller, sequence, delayable);
     }
 
-    public final void restoreQueueElements(QueueStore.QueueDescriptor queue, long firstSequence, long maxSequence, int maxCount,
+    public final void restoreQueueElements(QueueStore.QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount,
             org.apache.activemq.queue.QueueStore.RestoreListener<MessageDelivery> listener) {
-        database.restoreMessages(queue, firstSequence, maxSequence, maxCount, listener);
+        database.restoreMessages(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener);
     }
 
     public final void addQueue(QueueStore.QueueDescriptor queue) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=770290&r1=770289&r2=770290&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Thu Apr 30 15:38:34 2009
@@ -75,7 +75,8 @@
     private AtomicBoolean notify = new AtomicBoolean(false);
     private Semaphore opsReady = new Semaphore(0);
     private long opSequenceNumber;
-    private long flushPointer = -1; // The last seq num for which flush was requested    
+    private long flushPointer = -1; // The last seq num for which flush was
+    // requested
     private long requestedDelayedFlushPointer = -1; // Set to the last sequence
     // num scheduled for delay
     private long delayedFlushPointer = 0; // The last delayable sequence num
@@ -144,11 +145,10 @@
     public synchronized void stop() throws Exception {
         if (flushThread != null) {
 
-            synchronized(opQueue)
-            {
+            synchronized (opQueue) {
                 updateFlushPointer(opSequenceNumber + 1);
             }
-            
+
             running.set(false);
             boolean interrupted = false;
             while (true) {
@@ -262,10 +262,10 @@
         synchronized (opQueue) {
             if (flushPointer < requestedDelayedFlushPointer) {
                 updateFlushPointer(requestedDelayedFlushPointer);
-                
+
             }
-            
-            //If another delayed flush has been scheduled schedule it:
+
+            // If another delayed flush has been scheduled schedule it:
             requestedDelayedFlushPointer = -1;
             // Schedule next delay if needed:
             if (delayedFlushPointer > flushPointer) {
@@ -273,7 +273,7 @@
             } else {
                 delayedFlushPointer = -1;
             }
-            
+
         }
     }
 
@@ -345,7 +345,7 @@
                                 }
                             }, null);
                         }
-                        
+
                         if (count < 1000) {
                             op = getNextOp(false);
                         } else {
@@ -486,6 +486,8 @@
      * 
      * @param queue
      *            The queue for which to load messages
+     * @param recordsOnly
+     *            True if message body shouldn't be restored
      * @param first
      *            The first queue sequence number to load (-1 starts at
      *            begining)
@@ -497,8 +499,8 @@
      *            The listener to which messags should be passed.
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext restoreMessages(QueueStore.QueueDescriptor queue, long first, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
-        return add(new RestoreMessageOperation(queue, first, maxCount, maxSequence, listener), null, true);
+    public OperationContext restoreMessages(QueueStore.QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
+        return add(new RestoreMessageOperation(queue, recordsOnly, first, maxCount, maxSequence, listener), null, true);
     }
 
     private void onDatabaseException(IOException ioe) {
@@ -668,7 +670,8 @@
          *             operations.
          */
         public void execute(Session session) throws Exception, RuntimeException {
-            if(DEBUG) System.out.println("Executing " + this);
+            if (DEBUG)
+                System.out.println("Executing " + this);
             doExcecute(session);
         }
 
@@ -813,11 +816,13 @@
         private long firstKey;
         private int maxRecords;
         private long maxSequence;
+        private boolean recordsOnly;
         private RestoreListener<MessageDelivery> listener;
         private Collection<RestoredElement<MessageDelivery>> msgs = null;
 
-        RestoreMessageOperation(QueueStore.QueueDescriptor queue, long firstKey, int maxRecords, long maxSequence, RestoreListener<MessageDelivery> listener) {
+        RestoreMessageOperation(QueueStore.QueueDescriptor queue, boolean recordsOnly, long firstKey, int maxRecords, long maxSequence, RestoreListener<MessageDelivery> listener) {
             this.queue = queue;
+            this.recordsOnly = recordsOnly;
             this.firstKey = firstKey;
             this.maxRecords = maxRecords;
             this.maxSequence = maxSequence;
@@ -853,15 +858,12 @@
                     qRecord = records.next();
                     rm.nextSequence = qRecord.getQueueKey();
                 } else {
-                     // Look up the next sequence number:
+                    // Look up the next sequence number:
                     try {
                         records = session.queueListMessagesQueue(queue, qRecord.getQueueKey() + 1, -1L, 1);
-                        if(!records.hasNext())
-                        {
+                        if (!records.hasNext()) {
                             rm.nextSequence = -1;
-                        }
-                        else
-                        {
+                        } else {
                             rm.nextSequence = records.next().queueKey;
                         }
                     } catch (KeyNotFoundException e) {
@@ -870,24 +872,28 @@
                     qRecord = null;
                 }
 
-                try {
-                    rm.mRecord = session.messageGetRecord(rm.qRecord.messageKey);
-                    rm.handler = protocolHandlers.get(rm.mRecord.encoding.toString());
-                    if (rm.handler == null) {
-                        try {
-                            rm.handler = ProtocolHandlerFactory.createProtocolHandler(rm.mRecord.encoding.toString());
-                            protocolHandlers.put(rm.mRecord.encoding.toString(), rm.handler);
-                        } catch (Throwable thrown) {
-                            throw new RuntimeException("Unknown message format" + rm.mRecord.encoding.toString(), thrown);
+                if (!recordsOnly) {
+                    try {
+                        rm.mRecord = session.messageGetRecord(rm.qRecord.messageKey);
+                        rm.handler = protocolHandlers.get(rm.mRecord.encoding.toString());
+                        if (rm.handler == null) {
+                            try {
+                                rm.handler = ProtocolHandlerFactory.createProtocolHandler(rm.mRecord.encoding.toString());
+                                protocolHandlers.put(rm.mRecord.encoding.toString(), rm.handler);
+                            } catch (Throwable thrown) {
+                                throw new RuntimeException("Unknown message format" + rm.mRecord.encoding.toString(), thrown);
+                            }
                         }
+                        msgs.add(rm);
+                    } catch (KeyNotFoundException shouldNotHappen) {
+                        shouldNotHappen.printStackTrace();
                     }
+                } else {
                     msgs.add(rm);
-                } catch (KeyNotFoundException shouldNotHappen) {
-                    shouldNotHappen.printStackTrace();
                 }
             }
 
-            if(DEBUG)
+            if (DEBUG)
                 System.out.println("Restored: " + count + " messages");
         }
 
@@ -1007,26 +1013,61 @@
         QueueRecord qRecord;
         MessageRecord mRecord;
         ProtocolHandler handler;
-        long nextSequence = -1;
+        long nextSequence;
 
         public MessageDelivery getElement() throws IOException {
+            if (mRecord == null) {
+                return null;
+            }
+
             BrokerMessageDelivery delivery = handler.createMessageDelivery(mRecord);
             delivery.setFromDatabase(BrokerDatabase.this, mRecord);
             return delivery;
         }
 
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore.RestoredElement#getSequenceNumber
+         * ()
+         */
         public long getSequenceNumber() {
             return qRecord.getQueueKey();
         }
 
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore.RestoredElement#getStoreTracking
+         * ()
+         */
         public long getStoreTracking() {
             return qRecord.getMessageKey();
         }
 
+        /*
+         * (non-Javadoc)
+         * 
+         * @seeorg.apache.activemq.queue.QueueStore.RestoredElement#
+         * getNextSequenceNumber()
+         */
         public long getNextSequenceNumber() {
             return nextSequence;
         }
 
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore.RestoredElement#getElementSize()
+         */
+        public int getElementSize() {
+            // TODO Auto-generated method stub
+            return qRecord.getSize();
+        }
+
     }
 
     public long allocateStoreTracking() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java?rev=770290&r1=770289&r2=770290&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java Thu Apr 30 15:38:34 2009
@@ -27,12 +27,14 @@
     public interface SaveableQueueElement<V> {
         /**
          * Gets the element to save.
+         * 
          * @return
          */
         public V getElement();
 
         /**
          * Gets the sequence number of the element in the queue
+         * 
          * @return
          */
         public long getSequenceNumber();
@@ -57,12 +59,17 @@
      */
     public interface RestoredElement<V> {
         /**
-         * @return Gets the restored element
+         * @return Gets the restored element (possibly null if not requested)
          * @throws Exception
          */
         public V getElement() throws Exception;
 
         /**
+         * @return The element size.
+         */
+        int getElementSize();
+
+        /**
          * Returns the sequence number of this element in the queue
          * 
          * @return the sequence number of this element
@@ -217,14 +224,17 @@
     }
 
     /**
-     * Loads a batch of messages for the specified queue. The loaded messages
-     * are given the provided {@link MessageRestoreListener}.
+     * Loads a series of elements for the specified queue. The loaded messages
+     * are given to the provided {@link MessageRestoreListener}.
      * <p>
      * <b><i>NOTE:</i></b> This method uses the queue sequence number for the
      * message not the store tracking number.
      * 
      * @param queue
      *            The queue for which to load messages
+     * @param recordOnly
+     *            True if only the record data should be returned (excluding the
+     *            element itself)
      * @param firstSequence
      *            The first queue sequence number to load (-1 starts at
      *            beginning)
@@ -236,7 +246,7 @@
      *            The listener to which restored elements should be passed.
      * @return The {@link OperationContext} associated with the operation
      */
-    public void restoreQueueElements(QueueDescriptor queue, long firstSequence, long maxSequence, int maxCount, RestoreListener<V> listener);
+    public void restoreQueueElements(QueueDescriptor queue, boolean recordOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<V> listener);
 
     /**
      * Asynchronously deletes an element from the store.

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=770290&r1=770289&r2=770290&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Thu Apr 30 15:38:34 2009
@@ -59,7 +59,7 @@
     // which is used for tracking page in requests. A trailing
     // consumer will request messages from at most one restoreBlock
     // at a time from the database.
-    private static final int RESTORE_BLOCK_SIZE = 50;
+    private static final int RESTORE_BLOCK_SIZE = 1000;
 
     private static final int ACCEPTED = 0;
     private static final int NO_MATCH = 1;
@@ -93,9 +93,7 @@
     private final IFlowSizeLimiter<V> sizeLimiter;
 
     // Memory Limiter and controller operate against the liveCursor.
-    private static final long DEFAULT_MEMORY_LIMIT = 1536;
-    private final IFlowSizeLimiter<QueueElement> memoryLimiter;
-    private final FlowController<QueueElement> memoryController;
+    private static final long DEFAULT_MEMORY_LIMIT = 1000;
     private boolean useMemoryLimiter;
 
     private int totalQueueCount;
@@ -109,7 +107,6 @@
 
     SharedQueue(String name, IFlowSizeLimiter<V> limiter, Object mutex) {
         super(name);
-        liveCursor = new Cursor(name);
         this.mutex = mutex == null ? new Object() : mutex;
 
         flow = new Flow(getResourceName(), false);
@@ -122,24 +119,12 @@
         sizeController.useOverFlowQueue(false);
         super.onFlowOpened(sizeController);
 
-        if (DEFAULT_MEMORY_LIMIT < limiter.getCapacity()) {
-            memoryLimiter = new SizeLimiter<QueueElement>(DEFAULT_MEMORY_LIMIT, DEFAULT_MEMORY_LIMIT) {
-                public int getElementSize(QueueElement qe) {
-                    return qe.size;
-                };
-            };
+        liveCursor = new Cursor(name);
 
-            memoryController = new FlowController<QueueElement>(null, flow, memoryLimiter, mutex) {
-                @Override
-                public IFlowResource getFlowResource() {
-                    return SharedQueue.this;
-                }
-            };
+        if (DEFAULT_MEMORY_LIMIT < limiter.getCapacity()) {
             useMemoryLimiter = true;
         } else {
             useMemoryLimiter = false;
-            memoryLimiter = null;
-            memoryController = null;
         }
 
         loader = new ElementLoader();
@@ -211,7 +196,7 @@
 
             if (!started) {
                 started = true;
-                liveCursor.getNext();
+                liveCursor.activate();
                 if (isDispatchReady()) {
                     notifyReady();
                 }
@@ -248,7 +233,8 @@
             if (!store.isFromStore(elem) && store.isElemPersistent(elem)) {
                 try {
                     // TODO Revisit delayability criteria (basically,
-                    // opened, unblocked receivers)
+                    // opened, unblocked receivers, that aren't too far
+                    // from this element)
                     store.persistQueueElement(queueDescriptor, source, elem, qe.sequence, true);
 
                 } catch (Exception e) {
@@ -257,10 +243,9 @@
                 }
             }
 
-            //Add it to our queue:
+            // Add it to our queue:
             queue.add(qe);
             totalQueueCount++;
-            
             // Check with the loader to see if it needs to be paged out:
             loader.elementAdded(qe, source);
 
@@ -335,7 +320,7 @@
                 }
 
                 // Request page in if the next element is paged out:
-                next = liveCursor.getNext();
+                liveCursor.getNext();
             }
             return isDispatchReady();
         }
@@ -410,6 +395,9 @@
         SubscriptionContext(Subscription<V> target) {
             this.cursor = new Cursor(target.toString());
             this.sub = target;
+            // TODO If this consumer doesn't have a selector
+            // and there are other consumers without a selector
+            // we can join the live cursor as well.
             if (queue.isEmpty()) {
                 cursor.reset(liveCursor.sequence);
             } else {
@@ -503,7 +491,6 @@
             if (sub.offer(qe.elem, this, callback)) {
                 if (!sub.isBrowser()) {
                     qe.setAcquired(this);
-                    loader.releaseMemory(qe);
 
                     // If this came from the live cursor, update it
                     // if we acquired the element:
@@ -570,14 +557,73 @@
     class Cursor implements Comparable<Cursor> {
 
         private final String name;
+        private boolean activated = false;;
+
+        // The next element for this cursor
         QueueElement current = null;
+        // The current sequence number for this cursor,
+        // used when inactive or pointing to an element
+        // sequence number beyond the queue's limit.
         long sequence = -1;
-        boolean paging = false;
-        long restoreBlock = -1;
-        long requestedBlock = -1;
+
+        // The cursor is holding references for all
+        // elements between first and last inclusive:
+        QueueElement firstRef = null;
+        QueueElement lastRef = null;
+
+        // Each cursor can optionally be memory limited
+        // When the limiter is set the cursor is able to
+        // keep as many elements in memory as its limiter
+        // allows.
+        private final IFlowSizeLimiter<QueueElement> memoryLimiter;
+        private final IFlowController<QueueElement> memoryController;
 
         public Cursor(String name) {
             this.name = name;
+            if (DEFAULT_MEMORY_LIMIT < sizeLimiter.getCapacity()) {
+                memoryLimiter = new SizeLimiter<QueueElement>(DEFAULT_MEMORY_LIMIT, DEFAULT_MEMORY_LIMIT) {
+                    public int getElementSize(QueueElement qe) {
+                        return qe.size;
+                    };
+                };
+
+                memoryController = new FlowController<QueueElement>(null, flow, memoryLimiter, mutex) {
+                    @Override
+                    public IFlowResource getFlowResource() {
+                        return SharedQueue.this;
+                    }
+                };
+            } else {
+                memoryLimiter = null;
+                memoryController = null;
+            }
+        }
+
+        /**
+         * Offers a queue element to the cursor's memory limiter The cursor will
+         * return true if it has room for it in memory.
+         * 
+         * @param qe
+         *            The element for which to check.
+         * @return
+         */
+        public boolean offer(QueueElement qe) {
+            if (activated && memoryLimiter != null) {
+                if (current == null) {
+                    getNext();
+                }
+                checkPageIn();
+                if (lastRef != null) {
+                    // Return true if we absorbed it:
+                    if (qe.sequence <= lastRef.sequence && qe.sequence >= firstRef.sequence) {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            // Always accept an element if not memory
+            // limited providing we're active:
+            return activated;
         }
 
         public final void reset(long sequence) {
@@ -585,40 +631,101 @@
             current = null;
         }
 
+        public void activate() {
+            activated = true;
+            getNext();
+        }
+
         public void deactivate() {
-            if (paging) {
-                loader.removeBlockInterest(this);
-                requestedBlock = -1;
+            activated = false;
+
+            // Release all of our references:
+            long block = -1;
+            while (firstRef != null) {
+                block = firstRef.restoreBlock;
+                firstRef.releaseCursorRef(this);
+                memoryLimiter.remove(firstRef);
+
+                if (firstRef == lastRef) {
+                    loader.releaseBlock(this, block);
+                    firstRef = lastRef = null;
+                } else {
+                    firstRef = firstRef.getNext();
+                    if (firstRef.restoreBlock != block) {
+                        loader.releaseBlock(this, block);
+                    }
+                }
             }
+
+            // Let go of our current ref:
             current = null;
         }
 
-        private final void updateSequence(final long newSequence) {
-            this.sequence = newSequence;
-            // long newBlock = sequence / RESTORE_BLOCK_SIZE;
-            // if (newBlock != restoreBlock) {
-            // restoreBlock = newBlock;
-            // }
+        /**
+         * Makes sure elements are paged in
+         */
+        private final void checkPageIn() {
+            if (!activated)
+                return;
 
-            if (DEBUG && sequence > nextSequenceNumber) {
-                new Exception(this + "cursor overflow").printStackTrace();
+            if (memoryLimiter != null) {
+
+                // Release memory references up to our sequence number
+                long block = -1;
+                while (firstRef != null && firstRef.getSequence() < sequence) {
+                    block = firstRef.restoreBlock;
+                    firstRef.releaseCursorRef(this);
+                    memoryLimiter.remove(firstRef);
+
+                    if (firstRef == lastRef) {
+                        firstRef = lastRef = null;
+                    } else {
+                        firstRef = firstRef.getNext();
+                        // If we're passing into a new block
+                        // release the old one:
+                        if (firstRef.restoreBlock != block) {
+                            loader.releaseBlock(this, block);
+                        }
+                    }
+                }
+
+                // Now add refs for as many elements as we can hold:
+                QueueElement next = null;
+
+                if (lastRef == null) {
+                    next = current;
+                } else {
+                    next = lastRef.getNext();
+                }
+
+                while (next != null && !memoryLimiter.getThrottled()) {
+                    if (next.isLoaded()) {
+                        next.addCursorRef(this);
+                        if (firstRef == null) {
+                            firstRef = next;
+                        }
+                        memoryLimiter.add(next);
+                        lastRef = next;
+                        next = lastRef.getNext();
+                    } else {
+                        // TODO track our currently requested block to avoid
+                        // calling this
+                        // repeatedly
+                        loader.loadBlock(this, next.restoreBlock);
+                        break;
+                    }
+                }
+            }
+            // Otherwise we still need to ensure the block has been loaded:
+            else if (current != null && !current.isLoaded()) {
+                loader.loadBlock(this, current.restoreBlock);
             }
         }
 
-        private final void checkPageIn() {
-            if (current != null && current.isPagedOut()) {
-                if (current.restoreBlock != requestedBlock) {
-                    if (paging) {
-                        loader.removeBlockInterest(this);
-                    }
-                    requestedBlock = current.restoreBlock;
-                    paging = true;
-                    loader.addBlockInterest(this, current);
-                }
-            } else if (paging && requestedBlock != sequence / RESTORE_BLOCK_SIZE) {
-                loader.removeBlockInterest(this);
-                requestedBlock = -1;
-                paging = false;
+        private final void updateSequence(final long newSequence) {
+            this.sequence = newSequence;
+            if (DEBUG && sequence > nextSequenceNumber) {
+                new Exception(this + "cursor overflow").printStackTrace();
             }
         }
 
@@ -627,6 +734,9 @@
          *         for dispatch
          */
         public final boolean isReady() {
+            if (!activated)
+                return false;
+
             getNext();
             // Possible when the queue is empty
             if (current == null || current.isAcquired() || current.isPagedOut()) {
@@ -641,6 +751,7 @@
          */
         public final void skip(QueueElement elem) {
             QueueElement next = elem.isLinked() ? elem.getNext() : null;
+
             if (next != null) {
                 updateSequence(next.sequence);
                 current = next;
@@ -660,10 +771,8 @@
                 current = queue.getTail();
             }
 
-            // Get a pointer to the next element (make sure
-            // that our next pointer is linked, it could have
-            // been paged out):
-            if (current == null || !current.isLinked()) {
+            // Get a pointer to the next element
+            if (current == null) {
                 current = queue.upper(sequence, true);
                 if (current == null) {
                     return null;
@@ -717,8 +826,11 @@
     class QueueElement extends SortedLinkedListNode<QueueElement> implements Subscription.SubscriptionDeliveryCallback {
 
         V elem;
+
         SubscriptionContext owner;
         final long sequence;
+        int size = -1;
+
         long restoreBlock;
 
         // When a queue element is paged out, the first element
@@ -726,10 +838,16 @@
         // and size of paged out elements.
         int pagedOutCount = 0;
         long pagedOutSize = 0;
-        int size = 0;
+
+        // Cursors that have referenced this element,
+        // When this drops to 0 we can page out the
+        // element.
+        int cursorRefs = 0;
+        boolean acked = false;
 
         public QueueElement(V elem, long sequence) {
             this.elem = elem;
+
             if (elem != null) {
                 size = sizeLimiter.getElementSize(elem);
             }
@@ -737,20 +855,49 @@
             this.restoreBlock = sequence / RESTORE_BLOCK_SIZE;
         }
 
-        public void setAcquired(SubscriptionContext owner) {
+        public QueueElement(RestoredElement<V> restored) throws Exception {
+            this(restored.getElement(), restored.getSequenceNumber());
+            this.size = restored.getElementSize();
+        }
+
+        @Override
+        public final long getSequence() {
+            return sequence;
+        }
+
+        public final void addCursorRef(Cursor cursor) {
+            cursorRefs++;
+            if (elem == null) {
+                // If this is the first request for this
+                // element request a load:
+                if (cursorRefs == 1) {
+                    loader.pageIn(this);
+                }
+            }
+        }
+
+        public final void releaseCursorRef(Cursor cursor) {
+            cursorRefs--;
+            if (cursorRefs == 0) {
+                // TODO need a controller:
+                unload(cursor.memoryController);
+            }
+        }
+
+        public final void setAcquired(SubscriptionContext owner) {
             this.owner = owner;
-            sizeController.elementDispatched(elem);
         }
 
         public final void acknowledge() {
             synchronized (mutex) {
-                unlink();
+                acked = true;
+                owner = null;
                 totalQueueCount--;
-                if (isPagedOut()) {
-                    return;
-                } else if (store.isElemPersistent(elem) || store.isFromStore(elem)) {
+                sizeController.elementDispatched(elem);
+                if (store.isElemPersistent(elem) || store.isFromStore(elem)) {
                     store.deleteQueueElement(queueDescriptor, elem);
                 }
+                unload(null);
             }
         }
 
@@ -762,78 +909,87 @@
         }
 
         /**
-         * Pages this element out to free memory.
-         * 
-         * Memory is freed if upon the return of this call the passed in
-         * sizeController was not blocked. If the sizeController was blocked
-         * then memory is not freed until it is next unblocked.
+         * Attempts to unlink this element from the queue
          */
-        private void pageOut(ISourceController<?> controller) {
-            // See if we can page this out to save memory:
-            //
-            // Disqualifiers:
-            // - If this is already paged out then nothing to do
-            // - We don't page out acquired elements, the memory
-            // is accounted for by the owner, and we keep them
-            // in memory here, to make sure we don't try to pull
-            // it back in for another consumer
-            // - If there is a cursor active in this element's
-            // restore block don't page out, memory is accounted
-            // for in the cursor's sizeLimiter
-            if (pagedOutCount > 0 || owner != null || loader.inLoadQueue(this)) {
+        public final void unload(ISourceController<?> controller) {
+            // Unlink this element from the queue. Don't unlink
+            // if the element is acquired or if it is in the load
+            // queue.
+            if (cursorRefs > 0 || loader.inLoadQueue(this) || owner != null) {
                 return;
             }
 
-            // If the element is not persistent then we'll need to request a
-            // save:
-            if (!store.isFromStore(elem) && !store.isElemPersistent(elem)) {
-                try {
-                    store.persistQueueElement(queueDescriptor, controller, elem, sequence, false);
-                } catch (Exception e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                }
-            }
-
-            pagedOutCount = 1;
-            pagedOutSize = size;
-            elem = null;
-
-            // Collapse adjacent paged out elements:
             QueueElement next = getNext();
             QueueElement prev = getPrevious();
-            // If the next element is paged out
-            // replace it with this
-            if (next != null && next.pagedOutCount > 0) {
-                pagedOutCount += next.pagedOutCount;
-                pagedOutSize += next.pagedOutSize;
-                next.unlink();
-            }
-            // If the previous elem is paged out unlink this
-            // entry:
-            if (prev != null && prev.pagedOutCount > 0) {
-                prev.pagedOutCount += pagedOutCount;
-                prev.pagedOutSize += pagedOutSize;
+
+            // If acked unlink this element from the queue, and link
+            // together adjacent paged out entries:
+            if (acked) {
                 unlink();
+                // If both next and previous entries are unloaded,
+                // then collapse them:
+                if (next != null && prev != null && next.pagedOutCount > 0 && prev.pagedOutCount > 0) {
+                    prev.pagedOutCount += next.pagedOutCount;
+                    prev.pagedOutSize += next.pagedOutSize;
+                    next.unlink();
+                }
+            }
+            // Otherwise page out this element
+            else if (elem != null) {
+                // If the element is not persistent then we'll need to request a
+                // save:
+                if (!store.isFromStore(elem) && !store.isElemPersistent(elem)) {
+                    try {
+                        store.persistQueueElement(queueDescriptor, controller, elem, sequence, false);
+                    } catch (Exception e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                }
+
+                pagedOutCount = 1;
+                pagedOutSize = size;
+                elem = null;
+
+                // If the next element is unloaded
+                // replace it with this
+                if (next != null && next.pagedOutCount > 0) {
+                    pagedOutCount += next.pagedOutCount;
+                    pagedOutSize += next.pagedOutSize;
+                    next.unlink();
+                }
+
+                // If the previous elem is paged out unlink this
+                // entry:
+                if (prev != null && prev.pagedOutCount > 0) {
+                    prev.pagedOutCount += pagedOutCount;
+                    prev.pagedOutSize += pagedOutSize;
+                    unlink();
+                }
             }
 
             if (DEBUG)
                 System.out.println("Paged out element: " + this);
+
         }
 
         /**
-         * Called to relink a paged in element after this element.
+         * Called to relink a loaded element after this element.
          * 
          * @param qe
          *            The paged in element to relink.
          */
-        public QueueElement pagedIn(QueueElement qe, long nextSequence) {
+        public final QueueElement loadAfter(QueueElement qe, long nextSequence) {
             QueueElement ret = qe;
             // See if we have a pointer to a paged out element:
             if (sequence == qe.sequence) {
                 // Already paged in? Shouldn't be.
                 if (!isPagedOut()) {
-                    throw new IllegalStateException("Can't page in an already paged in element");
+                    // throw new
+                    // IllegalStateException("Can't page in an already paged in element");
+                    if (DEBUG) {
+                        System.out.println("Paged in already paged in element: " + this);
+                    }
                 } else {
                     // Otherwise set this element to the paged in one
                     // and add a new QueueElement to hold any additional
@@ -856,16 +1012,15 @@
                             e.printStackTrace();
                         }
                     }
-                    ret = this;
                 }
+                ret = this;
             } else {
                 // Otherwise simply link this element into the list:
-                 queue.add(qe);
+                queue.add(qe);
                 // Decrement pagedOutCount counter of previous element:
                 if (qe.prev != null && qe.prev.pagedOutCount > 0) {
-                    if(qe.prev.pagedOutCount > 1)
-                    {
-                        throw new IllegalStateException("Skipped paged in element");    
+                    if (qe.prev.pagedOutCount > 1) {
+                        throw new IllegalStateException("Skipped paged in element");
                     }
                     pagedOutCount = qe.pagedOutCount - 1;
                     qe.prev.pagedOutCount = 0;
@@ -879,22 +1034,21 @@
             return ret;
         }
 
-        public boolean isPagedOut() {
+        public final boolean isPagedOut() {
             return elem == null;
         }
 
-        public boolean isAcquired() {
+        public final boolean isLoaded() {
+            return pagedOutCount == 0;
+        }
+
+        public final boolean isAcquired() {
             return owner != null;
         }
 
         public String toString() {
             return "QueueElement " + sequence + " pagedOutCount: " + pagedOutCount + " owner: " + owner;
         }
-
-        @Override
-        public long getSequence() {
-            return sequence;
-        }
     }
 
     /**
@@ -905,8 +1059,8 @@
      * redelivered status of elements.
      * 
      * If the queue's memory limit is less than the queue size then this class
-     * tracks cursor activity in the queue, loading elements into memory as they
-     * are needed.
+     * tracks cursor activity in the queue, loading/unloading elements into
+     * memory as they are needed.
      * 
      * @author cmacnaug
      */
@@ -920,6 +1074,13 @@
         }
 
         /**
+         * @param queueElement
+         */
+        public void pageIn(QueueElement qe) {
+            store.restoreQueueElements(queueDescriptor, false, qe.sequence, qe.sequence, 1, this);
+        }
+
+        /**
          * Must be called after an element is added to the queue to enforce
          * memory limits
          * 
@@ -929,79 +1090,97 @@
          *            The source of the message
          */
         public final void elementAdded(QueueElement qe, ISourceController<V> source) {
+
             if (useMemoryLimiter) {
-                if (!qe.isPagedOut()) {
-                    memoryController.add(qe, source);
 
-                    if (memoryLimiter.getThrottled()) {
+                // Check with the live cursor to see if it is willing to
+                // absorb the element. If so that's good enough.
+                if (liveCursor.offer(qe)) {
+                    return;
+                }
 
-                        qe.pageOut(memoryController);
-                        // If we paged it out release memory:
-                        if (qe.isPagedOut()) {
-                            releaseMemory(qe);
-                        }
+                // Find a cursor willing to accept the element:
+                HashSet<Cursor> active = requestedBlocks.get(qe.sequence);
+
+                // If there are none, unload the element:
+                if (active == null) {
+                    qe.unload(source);
+                    return;
+                }
+
+                // See if a cursor is willing to hang on to the
+                // element:
+                boolean accepted = false;
+                for (Cursor cursor : active) {
+                    // Already checked the live cursor above:
+                    if (cursor == liveCursor) {
+                        continue;
+                    }
+
+                    if (cursor.offer(qe)) {
+                        accepted = true;
                     }
                 }
-            }
-        }
 
-        //Updates memory when an element is loaded from the database:
-        private final void elementLoaded(QueueElement qe) {
-            if (useMemoryLimiter) {
-                memoryController.add(qe, null);
+                // If no cursor accepted it, then page out the element:
+                // keeping the element loaded.
+                if (!accepted) {
+                    qe.unload(source);
+                }
             }
         }
 
-        public final void releaseMemory(QueueElement qe) {
-            if (useMemoryLimiter) {
-                memoryController.elementDispatched(qe);
-            }
+        // Updates memory when an element is loaded from the database:
+        private final void elementLoaded(QueueElement qe) {
+            // TODO track the rate of loaded elements vs those that
+            // are added to the queue. We'll want to throttle back
+            // enqueueing sources to a rate less than the restore
+            // rate so we can stay out of the store.
         }
 
-        public void addBlockInterest(Cursor cursor, QueueElement element) {
-            HashSet<Cursor> cursors = requestedBlocks.get(cursor.requestedBlock);
+        public void loadBlock(Cursor cursor, long block) {
+            HashSet<Cursor> cursors = requestedBlocks.get(block);
             if (cursors == null) {
                 cursors = new HashSet<Cursor>();
-                requestedBlocks.put(cursor.requestedBlock, cursors);
+                requestedBlocks.put(block, cursors);
 
                 // Max sequence number is the end of this restoreBlock:
-                long maxSequence = (cursor.requestedBlock * RESTORE_BLOCK_SIZE) + RESTORE_BLOCK_SIZE;
+                long firstSequence = block * RESTORE_BLOCK_SIZE;
+                long maxSequence = block * RESTORE_BLOCK_SIZE + RESTORE_BLOCK_SIZE - 1;
                 // Don't pull in more than is paged out:
-                int maxCount = Math.min(element.pagedOutCount, RESTORE_BLOCK_SIZE);
-                if(DEBUG)
-                    System.out.println(cursor + " requesting restoreBlock:" + cursor.requestedBlock + " from " + element.getSequence() + " to " + maxSequence + " max: " + maxCount);
-                store.restoreQueueElements(queueDescriptor, element.getSequence(), maxSequence, maxCount, this);
+                // int maxCount = Math.min(element.pagedOutCount,
+                // RESTORE_BLOCK_SIZE);
+                int maxCount = RESTORE_BLOCK_SIZE;
+                if (DEBUG)
+                    System.out.println(cursor + " requesting restoreBlock:" + block + " from " + firstSequence + " to " + maxSequence + " max: " + maxCount + " queueMax: " + nextSequenceNumber);
+
+                // If we are memory limited only pull in queue records, don't
+                // bring in the payload.
+                // Each active cursor will have to pull in messages based on
+                // available memory.
+                store.restoreQueueElements(queueDescriptor, useMemoryLimiter, firstSequence, maxSequence, maxCount, this);
             }
             cursors.add(cursor);
         }
 
-        public void removeBlockInterest(Cursor cursor) {
-            long block = cursor.requestedBlock;
+        public void releaseBlock(Cursor cursor, long block) {
             HashSet<Cursor> cursors = requestedBlocks.get(block);
             if (cursors == null) {
-                if (DEBUG)
+                if (true || DEBUG)
                     System.out.println(this + " removeBlockInterest, no consumers " + cursor);
             } else {
                 if (cursors.remove(cursor)) {
                     if (cursors.isEmpty()) {
-                        requestedBlocks.remove(cursor.requestedBlock);
-                        //If this is the last cursor active in this block page out the block:
-                        if(useMemoryLimiter)
-                        {
-                            QueueElement qe = queue.upper(RESTORE_BLOCK_SIZE * cursor.requestedBlock, true);
-                            while(qe != null && qe.restoreBlock == block)
-                            {
+                        requestedBlocks.remove(block);
+                        // If this is the last cursor active in this block page
+                        // out the block:
+                        if (useMemoryLimiter) {
+                            QueueElement qe = queue.upper(RESTORE_BLOCK_SIZE * block, true);
+                            while (qe != null && qe.restoreBlock == block) {
                                 QueueElement next = qe.getNext();
-                                if(!qe.isPagedOut())
-                                {
-                                    qe.pageOut(memoryController);
-                                    // If we paged it out release memory:
-                                    if (qe.isPagedOut()) {
-                                        System.out.println(this + " removeBlockInterest, released memory for: " + this);
-                                        releaseMemory(qe);
-                                    }
-                                    qe = next;
-                                }
+                                // TODO use the cursor's flow controller:
+                                qe.unload(null);
+                                qe = next;
                             }
                         }
                     }
@@ -1010,7 +1189,7 @@
                         System.out.println(this + " removeBlockInterest, no cursor " + cursor);
                 }
             }
-            
+
         }
 
         /**
@@ -1033,12 +1212,26 @@
                 // boolean trailingRestore = false;
                 for (QueueStore.RestoredElement<V> restored : restoredElems) {
                     try {
-                        V delivery = restored.getElement();
-                        QueueElement qe = new QueueElement(delivery, restored.getSequenceNumber());
+                        // V delivery = restored.getElement();
+                        // TODO Might be better to change the loadAfter
+                        // signature to directly take the RestoredElement:
+                        // This would avoid creating an QueueElement that might
+                        // not be needed if the element is already paged it:
+                        QueueElement qe = new QueueElement(restored);
                         QueueElement lower = queue.lower(qe.sequence, true);
-                        qe = lower.pagedIn(qe, restored.getNextSequenceNumber());
+                        qe = lower.loadAfter(qe, restored.getNextSequenceNumber());
                         loader.elementLoaded(qe);
 
+                        // If we are memory limited remove the request block
+                        // entry
+                        // as soon as we load the block:
+                        if (!useMemoryLimiter) {
+                            requestedBlocks.remove(qe.restoreBlock);
+                        }
+
+                        if (DEBUG)
+                            System.out.println(this + " Loaded loaded" + qe);
+
                     } catch (Exception ioe) {
                         ioe.printStackTrace();
                         shutdown();
@@ -1068,7 +1261,7 @@
         }
 
         public String toString() {
-            return "MsgRetriever " + SharedQueue.this;
+            return "QueueLoader " + SharedQueue.this;
         }
     }
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=770290&r1=770289&r2=770290&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Thu Apr 30 15:38:34 2009
@@ -58,7 +58,7 @@
     protected boolean multibroker = false;
 
     // Set to mockup up ptp:
-    protected boolean ptp = false;
+    protected boolean ptp = true;
 
     // Set to use tcp IO
     protected boolean tcp = true;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=770290&r1=770289&r2=770290&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Thu Apr 30 15:38:34 2009
@@ -43,7 +43,7 @@
     protected boolean multibroker = false;
 
     // Set to mockup up ptp:
-    protected boolean ptp = false;
+    protected boolean ptp = true;
 
     // Set to use tcp IO
     protected boolean tcp = false;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=770290&r1=770289&r2=770290&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Thu Apr 30 15:38:34 2009
@@ -193,7 +193,7 @@
             // Noop;
         }
 
-        public final void restoreQueueElements(QueueStore.QueueDescriptor queue, long firstSequence, long maxSequence, int maxCount, QueueStore.RestoreListener<Message> listener) {
+        public final void restoreQueueElements(QueueStore.QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, QueueStore.RestoreListener<Message> listener) {
             throw new UnsupportedOperationException("Mock broker doesn't support persistence");
         }
 



Mime
View raw message