qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r761688 - /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
Date Fri, 03 Apr 2009 14:15:08 GMT
Author: ritchiem
Date: Fri Apr  3 14:15:08 2009
New Revision: 761688

URL: http://svn.apache.org/viewvc?rev=761688&view=rev
Log:
QPID-1784 Update to FlowableBaseQueueEntryList to ensure that the inhaler and purger threads
will stop when the inMemory values are within the correct range.

Merge of r761671 and r761674 from trunk

Modified:
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=761688&r1=761687&r2=761688&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
(original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
Fri Apr  3 14:15:08 2009
@@ -359,7 +359,12 @@
         _asynchronousInhaler.compareAndSet(messageInhaler, null);
         int inhaled = 1;
 
+        //Because we may not be able to totally fill up to _memoryUsageMaximum we need to
be able to say we've done
+        // enough loading and this inhale process should stop
+        boolean finshedInhaling = false;
+
         while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled
our max memory
+               && !finshedInhaling // Have we loaded all we can fit into memory
                && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we
haven't loaded all that is available
                && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs
we do
                && (inhaled > 0) // ensure we could inhale something
@@ -379,7 +384,9 @@
             // we won't have checked the last entry to see if we can load it. So create atEndofList
and update it based
             // on the return from advance() which returns true if it can advance.
             boolean atEndofList = false;
-            while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled
our max memory
+
+            while ((_atomicQueueInMemory.get() <= _memoryUsageMaximum) // we haven't filled
our max memory
+                   && !finshedInhaling // Have we loaded all we can fit into memory
                    && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs
we do
                    && !atEndofList) // We have reached end of list QueueEntries
             {
@@ -394,7 +401,7 @@
                         {
                             _log.debug("Entry won't fit in memory stopping inhaler:" + entry.debugIdentity());
                         }
-                        inhaled = BATCH_PROCESS_COUNT;
+                        finshedInhaling = true;
                     }
                     else
                     {
@@ -421,7 +428,7 @@
         }
 
         //If we have become flowed or have more capacity since we stopped then schedule the
thread to run again.
-        if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+        if (!finshedInhaling && _flowed.get() && _atomicQueueInMemory.get()
< _memoryUsageMaximum)
         {
             if (_log.isInfoEnabled())
             {
@@ -471,7 +478,7 @@
         _asynchronousPurger.compareAndSet(messagePurger, null);
         int purged = 0;
 
-        while ((_atomicQueueInMemory.get() > _memoryUsageMinimum)
+        while ((_atomicQueueInMemory.get() > _memoryUsageMaximum)
                && purged < BATCH_PROCESS_COUNT
                && _asynchronousPurger.compareAndSet(null, messagePurger))
         {
@@ -496,6 +503,12 @@
                 if (entry.isAvailable() && !entry.isFlowed())
                 {
                     memoryUsage += entry.getSize();
+                    // If this message is what puts us over the limit then break
+                    // out of this loop as we need to purge this item.
+                    if (memoryUsage > _memoryUsageMaximum)
+                    {
+                        break;
+                    }
                 }
 
                 atTail = !iterator.advance();
@@ -525,7 +538,7 @@
         }
 
         //If we are still flowed and are over the minimum value then schedule to run again.
-        if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum)
+        if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMaximum)
         {
             _log.info("Rescheduling Purger:" + _queue.getName());
             _purger.execute(messagePurger);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message