qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r761671 - /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
Date Fri, 03 Apr 2009 13:33:42 GMT
Author: ritchiem
Date: Fri Apr  3 13:33:42 2009
New Revision: 761671

URL: http://svn.apache.org/viewvc?rev=761671&view=rev
Log:
QPID-1784 Update to FlowableBaseQueueEntryList to ensure that the inhaler and purger threads
will stop when the inMemory values are within the correct range.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=761671&r1=761670&r2=761671&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
Fri Apr  3 13:33:42 2009
@@ -359,7 +359,12 @@
         _asynchronousInhaler.compareAndSet(messageInhaler, null);
         int inhaled = 1;
 
+        //Because we may not be able to totally fill up to _memoryUsageMaximum we need to
be able to say we've done
+        // enough loading and this inhale process should stop
+        boolean finshedInhaling = false;
+
         while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled
our max memory
+               && !finshedInhaling // Have we loaded all we can fit into memory
                && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we
haven't loaded all that is available
                && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs
we do
                && (inhaled > 0) // ensure we could inhale something
@@ -379,7 +384,9 @@
             // we won't have checked the last entry to see if we can load it. So create atEndofList
and update it based
             // on the return from advance() which returns true if it can advance.
             boolean atEndofList = false;
-            while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled
our max memory
+
+            while ((_atomicQueueInMemory.get() <= _memoryUsageMaximum) // we haven't filled
our max memory
+                   && !finshedInhaling // Have we loaded all we can fit into memory
                    && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs
we do
                    && !atEndofList) // We have reached end of list QueueEntries
             {
@@ -394,7 +401,7 @@
                         {
                             _log.debug("Entry won't fit in memory stopping inhaler:" + entry.debugIdentity());
                         }
-                        inhaled = BATCH_PROCESS_COUNT;
+                        finshedInhaling = true;
                     }
                     else
                     {
@@ -421,7 +428,7 @@
         }
 
         //If we have become flowed or have more capacity since we stopped then schedule the
thread to run again.
-        if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+        if (!finshedInhaling _flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
         {
             if (_log.isInfoEnabled())
             {
@@ -471,7 +478,7 @@
         _asynchronousPurger.compareAndSet(messagePurger, null);
         int purged = 0;
 
-        while ((_atomicQueueInMemory.get() > _memoryUsageMinimum)
+        while ((_atomicQueueInMemory.get() > _memoryUsageMaximum)
                && purged < BATCH_PROCESS_COUNT
                && _asynchronousPurger.compareAndSet(null, messagePurger))
         {
@@ -496,6 +503,12 @@
                 if (entry.isAvailable() && !entry.isFlowed())
                 {
                     memoryUsage += entry.getSize();
+                    // If this message is what puts us over the limit then break
+                    // out of this loop as we need to purge this item.
+                    if (memoryUsage > _memoryUsageMaximum)
+                    {
+                        break;
+                    }
                 }
 
                 atTail = !iterator.advance();
@@ -525,7 +538,7 @@
         }
 
         //If we are still flowed and are over the minimum value then schedule to run again.
-        if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum)
+        if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMaximum)
         {
             if (_log.isInfoEnabled())
             {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message