qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r758758 - in /qpid/branches/0.5-release/qpid/java/broker/src: main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/queue/
Date Thu, 26 Mar 2009 17:27:31 GMT
Author: ritchiem
Date: Thu Mar 26 17:27:26 2009
New Revision: 758758

URL: http://svn.apache.org/viewvc?rev=758758&view=rev
Log:
QPID-1768 : Removed all the special priority queue code. Added the ability for a FlowableBaseQueueEntryList
to delegate its ac        - New Messages on a flowed queue are pushed optimistically pushed
to disk, this should potentially be removed and just rel        - When space is available
messages are loaded in queue order, so in this case Priority order.
Merged revision r758742

Modified:
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=758758&r1=758757&r2=758758&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
(original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
Thu Mar 26 17:27:26 2009
@@ -54,6 +54,7 @@
     protected boolean _disableFlowToDisk;
     private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null);
     private static final int BATCH_PROCESS_COUNT = 100;
+    protected FlowableBaseQueueEntryList _parentQueue;
 
     FlowableBaseQueueEntryList(AMQQueue queue)
     {
@@ -89,7 +90,7 @@
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:"
+ memoryUsed()
+            _log.debug(prefix + " Queue(" + _queue.getName() + ") usage:" + memoryUsed()
                        + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum()
                        + "/" + dataSize());
         }
@@ -97,7 +98,14 @@
 
     public boolean isFlowed()
     {
-        return _flowed.get();
+        if (_parentQueue != null)
+        {
+            return _parentQueue.isFlowed();
+        }
+        else
+        {
+            return _flowed.get();
+        }
     }
 
     public int size()
@@ -204,12 +212,19 @@
      */
     public void entryUnloadedUpdateMemory(QueueEntry queueEntry)
     {
-        if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize())
< 0)
+        if (_parentQueue != null)
         {
-            _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
+            _parentQueue.entryUnloadedUpdateMemory(queueEntry);
         }
+        else
+        {
+            if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize())
< 0)
+            {
+                _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
+            }
 
-        checkAndStartInhaler();
+            checkAndStartInhaler();
+        }
     }
 
     /**
@@ -219,11 +234,18 @@
      */
     public void entryLoadedUpdateMemory(QueueEntry queueEntry)
     {
-        if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize())
> _memoryUsageMaximum)
+        if (_parentQueue != null)
         {
-            _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
-            setFlowed(true);
-            startPurger();
+            _parentQueue.entryLoadedUpdateMemory(queueEntry);
+        }
+        else
+        {
+            if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize())
> _memoryUsageMaximum)
+            {
+                _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
+                setFlowed(true);
+                startPurger();
+            }
         }
     }
 
@@ -241,28 +263,55 @@
         }
     }
 
-    protected void incrementCounters(final QueueEntryImpl queueEntry)
+    /**
+     * Mark this queue as part of another QueueEntryList for accounting purposes.
+     *
+     * All Calls from the QueueEntry to the QueueEntryList need to check if there is
+     * a parent QueueEntrylist upon which the action should take place.
+     *
+     * @param queueEntryList The parent queue that is performing accounting.
+     */    
+    public void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList)
     {
-        _atomicQueueCount.incrementAndGet();
-        _atomicQueueSize.addAndGet(queueEntry.getSize());
-        long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+        _parentQueue = queueEntryList;
+    }
 
-        if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum)
+    protected void incrementCounters(final QueueEntryImpl queueEntry)
+    {
+        if (_parentQueue != null)
+        {
+            _parentQueue.incrementCounters(queueEntry);
+        }
+        else
         {
-            setFlowed(true);
-            queueEntry.unload();
+            _atomicQueueCount.incrementAndGet();
+            _atomicQueueSize.addAndGet(queueEntry.getSize());
+            long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+
+            if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum)
+            {
+                setFlowed(true);
+                queueEntry.unload();
+            }
         }
     }
 
     protected void dequeued(QueueEntryImpl queueEntry)
     {
-        _atomicQueueCount.decrementAndGet();
-        _atomicQueueSize.addAndGet(-queueEntry.getSize());
-        if (!queueEntry.isFlowed())
+        if (_parentQueue != null)
         {
-            if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+            _parentQueue.dequeued(queueEntry);
+        }
+        else
+        {
+            _atomicQueueCount.decrementAndGet();
+            _atomicQueueSize.addAndGet(-queueEntry.getSize());
+            if (!queueEntry.isFlowed())
             {
-                _log.error("InMemory Count just went below 0 on dequeue.");
+                if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+                {
+                    _log.error("InMemory Count just went below 0 on dequeue.");
+                }
             }
         }
     }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java?rev=758758&r1=758757&r2=758758&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
(original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
Thu Mar 26 17:27:26 2009
@@ -39,6 +39,7 @@
         for (int i = 0; i < priorities; i++)
         {
             _priorityLists[i] = new SimpleQueueEntryList(queue);
+            _priorityLists[i].setParentQueueEntryList(this);
         }
 
         showUsage("Created:" + _queue.getName());
@@ -66,183 +67,9 @@
             index = 0;
         }
 
-        long requriedSize = message.getSize();
-        // Check and see if list would flow on adding message
-        if (!_disableFlowToDisk && !isFlowed() && _priorityLists[index].memoryUsed()
+ requriedSize > _priorityLists[index].getMemoryUsageMaximum())
-        {
-            if (_log.isDebugEnabled())
-            {
-                _log.debug("Message(" + message.debugIdentity() + ") Add of size ("
-                              + requriedSize + ") will cause flow. Searching for space");
-            }
-
-            long reclaimed = 0;
-
-            //work down the priorities looking for memory
-
-            //First: Don't take all the memory. So look for a queue that has more than 50%
free
-            long currentMax;
-            int scavangeIndex = 0;
-
-            if (scavangeIndex == index)
-            {
-                scavangeIndex++;
-            }
-
-            while (scavangeIndex < _priorities  && reclaimed <= requriedSize)
-            {
-                currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
-                long used = _priorityLists[scavangeIndex].memoryUsed();
-
-                if (used < currentMax / 2)
-                {
-                    long newMax = currentMax / 2;
-
-                    _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
-                    reclaimed += currentMax - newMax;
-                    if (_log.isDebugEnabled())
-                    {
-                        _log.debug("Reclaiming(1) :" + (currentMax - newMax) + "(" + reclaimed
+ "/" + requriedSize + ") from queue:" + scavangeIndex);
-                    }
-                    break;
-                }
-                else
-                {
-                    scavangeIndex++;
-                    if (scavangeIndex == index)
-                    {
-                        scavangeIndex++;
-                    }
-                }
-            }                                   
-
-            //Second: Just take the free memory we need
-            if (scavangeIndex == _priorities)
-            {
-                scavangeIndex = 0;
-                if (scavangeIndex == index)
-                {
-                    scavangeIndex++;
-                }
-
-                while (scavangeIndex < _priorities && reclaimed <= requriedSize)
-                {
-                    currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
-                    long used = _priorityLists[scavangeIndex].memoryUsed();
-
-                    if (used < currentMax)
-                    {
-                        long newMax = currentMax - used;
-
-                        // if there are no messages at this priority just take it all
-                        if (newMax == currentMax)
-                        {
-                            newMax = 0;
-                        }
-
-                        _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
-                        reclaimed += currentMax - newMax;
-                        if (_log.isDebugEnabled())
-                        {
-                            _log.debug("Reclaiming(2) :" + (currentMax - newMax) + "(" +
reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
-                        }
-                        break;
-                    }
-                    else
-                    {
-                        scavangeIndex++;
-                        if (scavangeIndex == index)
-                        {
-                            scavangeIndex++;
-                        }
-                    }
-                }
-
-                //Third: Take required memory
-                if (scavangeIndex == _priorities)
-                {
-                    scavangeIndex = 0;
-                    if (scavangeIndex == index)
-                    {
-                        scavangeIndex++;
-                    }
-                    while (scavangeIndex < _priorities && reclaimed <= requriedSize)
-                    {
-                        currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
-
-                        if (currentMax > 0 )
-                        {
-                            long newMax = currentMax;
-                            // Just take the amount of space required for this message.
-                            if (newMax > requriedSize)
-                            {
-                                newMax = requriedSize;
-                            }
-                            _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
-                            reclaimed += currentMax - newMax;
-                            if (_log.isDebugEnabled())
-                            {
-                                _log.debug("Reclaiming(3) :" + (currentMax - newMax) + "("
+ reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
-                            }
-                            break;
-                        }
-                        else
-                        {
-                            scavangeIndex++;
-                            if (scavangeIndex == index)
-                            {
-                                scavangeIndex++;
-                            }
-                        }
-                    }
-                }
-            }
-
-            //Increment Maximum
-            if (reclaimed > 0)
-            {
-                if (_log.isDebugEnabled())
-                {
-                    _log.debug("Increasing queue(" + index + ") maximum by " + reclaimed
-                               + " to " + (_priorityLists[index].getMemoryUsageMaximum()
+ reclaimed));
-                }
-                _priorityLists[index].setMemoryUsageMaximum(_priorityLists[index].getMemoryUsageMaximum()
+ reclaimed);
-            }
-            else
-            {
-                _log.debug("No space found.");
-            }
-
-            if (_log.isTraceEnabled())
-            {
-                showUsage("Add");
-            }
-        }
-
         return _priorityLists[index].add(message);
     }
 
-    @Override
-    protected void showUsage(String prefix)
-    {
-        if (_log.isDebugEnabled())
-        {
-            if (prefix.length() != 0)
-            {
-                _log.debug(prefix);
-            }
-            for (int index = 0; index < _priorities; index++)
-            {
-                QueueEntryList queueEntryList = _priorityLists[index];
-                _log.debug("Queue (" + _queue.getName() + ")[" + index + "] usage:" + queueEntryList.memoryUsed()
-                           + "/" + queueEntryList.getMemoryUsageMaximum()
-                           + "/" + queueEntryList.dataSize());
-            }
-        }
-    }
 
     public QueueEntry next(QueueEntry node)
     {
@@ -338,122 +165,7 @@
         }
     }
 
-    @Override
-    public boolean isFlowed()
-    {
-        boolean flowed = false;
-        boolean full = true;
-
-        if (_log.isTraceEnabled())
-        {
-            showUsage("isFlowed");
-        }
-
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            //full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed();
-            full = full && queueEntryList.getMemoryUsageMaximum() <= queueEntryList.dataSize();
-            flowed = flowed || (queueEntryList.isFlowed());
-        }
-        return flowed && full;
-    }
-
-    @Override
-    public int size()
-    {
-        int size = 0;
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            size += queueEntryList.size();
-        }
-
-        return size;
-    }
-
-    @Override
-    public long dataSize()
-    {
-        int dataSize = 0;
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            dataSize += queueEntryList.dataSize();
-        }
-
-        return dataSize;
-    }
-
-    @Override
-    public long memoryUsed()
-    {
-        int memoryUsed = 0;
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            memoryUsed += queueEntryList.memoryUsed();
-        }
-
-        return memoryUsed;
-    }
-
-    @Override
-    public void setMemoryUsageMaximum(long maximumMemoryUsage)
-    {
-        _memoryUsageMaximum = maximumMemoryUsage;
-
-        if (maximumMemoryUsage >= 0)
-        {
-            _disableFlowToDisk = false;
-        }
-
-        long share = maximumMemoryUsage / _priorities;
-
-        //Apply a share of the maximum To each prioirty quue
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            queueEntryList.setMemoryUsageMaximum(share);
-        }
-
-        if (maximumMemoryUsage < 0)
-        {
-            if (_log.isInfoEnabled())
-            {
-                _log.info("Disabling Flow to Disk for queue:" + _queue.getName());
-            }
-            _disableFlowToDisk = true;
-            return;
-        }
-
-        //ensure we use the full allocation of memory
-        long remainder = maximumMemoryUsage - (share * _priorities);
-        if (remainder > 0)
-        {
-            _priorityLists[_priorities - 1].setMemoryUsageMaximum(share + remainder);
-        }
-    }
-
-    @Override
-    public long getMemoryUsageMaximum()
-    {
-        return _memoryUsageMaximum;
-    }
-
-    @Override
-    public void setMemoryUsageMinimum(long minimumMemoryUsage)
-    {
-        _memoryUsageMinimum = minimumMemoryUsage;
-
-        //Apply a share of the minimum To each prioirty quue
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            queueEntryList.setMemoryUsageMaximum(minimumMemoryUsage / _priorities);
-        }
-    }
-
-    @Override
-    public long getMemoryUsageMinimum()
-    {
-        return _memoryUsageMinimum;
-    }
-
+  
     @Override
     public void stop()
     {

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=758758&r1=758757&r2=758758&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
(original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Thu Mar 26 17:27:26 2009
@@ -155,6 +155,13 @@
         return (_flags & DELIVERED_TO_CONSUMER) != 0;
     }
 
+    /**
+     * Called when this message is delivered to a consumer. (used to implement the 'immediate'
flag functionality).
+     * And for selector efficiency.
+     *
+     * This is now also used to unload the message if this entry is on a flowed queue. As
a result this method should
+     * only be called after the message has been sent.
+     */    
     public void setDeliveredToSubscription()
     {
         _flags |= DELIVERED_TO_CONSUMER;

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=758758&r1=758757&r2=758758&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
(original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
Thu Mar 26 17:27:26 2009
@@ -63,4 +63,14 @@
     void entryLoadedUpdateMemory(QueueEntry queueEntry);
 
     void stop();
+
+    /**
+     * Mark this queue as part of another QueueEntryList for accounting purposes.
+     *
+     * All Calls from the QueueEntry to the QueueEntryList need to check if there is
+     * a parent QueueEntrylist upon which the action should take place.
+     *
+     * @param queueEntryList The parent queue that is performing accounting.
+     */
+    void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList);
 }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java?rev=758758&r1=758757&r2=758758&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
(original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
Thu Mar 26 17:27:26 2009
@@ -50,21 +50,4 @@
             fail(e.getMessage());
         }
     }
-
-    @Override
-    public void testQueueValuesAfterCreation()
-    {
-        try
-        {
-            AMQQueue queue = createQueue();
-
-            assertEquals("MemoryMaximumSize not set correctly:", MAX_SIZE, queue.getMemoryUsageMaximum());
-            //NOTE: Priority queue will show 0 as minimum as the minimum value is actually
spread between its sub QELs
-            assertEquals("MemoryMinimumSize not 0 as expected for a priority queue:", 0,
queue.getMemoryUsageMinimum());
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
-    }
 }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=758758&r1=758757&r2=758758&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
(original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
Thu Mar 26 17:27:26 2009
@@ -48,11 +48,4 @@
         _contentHeaderBody = new ContentHeaderBody(properties, BasicPublishBodyImpl.CLASS_ID);
         _contentBodies = new ArrayList<ContentChunk>();
     }
-
-
-    @Override
-    public long getSize()
-    {
-        return 0l;
-    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message