qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1348090 - /qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
Date Fri, 08 Jun 2012 14:32:16 GMT
Author: kgiusti
Date: Fri Jun  8 14:32:15 2012
New Revision: 1348090

URL: http://svn.apache.org/viewvc?rev=1348090&view=rev
Log:
QPID-4046: rate limit the release of dequeued messages.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp?rev=1348090&r1=1348089&r2=1348090&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp Fri Jun  8 14:32:15 2012
@@ -40,13 +40,16 @@ size_t MessageDeque::index(const framing
 bool MessageDeque::deleted(const QueuedMessage& m)
 {
     size_t i = index(m.position);
-    if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) {
-        messages[i].status = QueuedMessage::DELETED;
-        clean();
-        return true;
-    } else {
-        return false;
+    if (i < messages.size()) {
+        QueuedMessage *qm = &messages[i];
+        if (qm->status != QueuedMessage::DELETED) {
+            qm->status = QueuedMessage::DELETED;
+            qm->payload.reset(); // message no longer needed
+            clean();
+            return true;
+        }
     }
+    return false;
 }
 
 size_t MessageDeque::size()
@@ -144,6 +147,7 @@ QueuedMessage* MessageDeque::pushPtr(con
     messages.back().status = QueuedMessage::AVAILABLE;
     if (head >= messages.size()) head = messages.size() - 1;
     ++available;
+    clean();  // QPID-4046: let producer help clean the backlog of deleted messages
     return &messages.back();
 }
 
@@ -195,10 +199,15 @@ void MessageDeque::setPosition(const fra
 
 void MessageDeque::clean()
 {
-    while (messages.size() && messages.front().status == QueuedMessage::DELETED)
{
+    // QPID-4046: If a queue has multiple consumers, then it is possible for a large
+    // collection of deleted messages to build up.  Limit the number of messages cleaned
+    // up on each call to clean().
+    size_t count = 0;
+    while (messages.size() && messages.front().status == QueuedMessage::DELETED &&
count < 10) {
         messages.pop_front();
-        if (head) --head;
+        count += 1;
     }
+    head = (head > count) ? head - count : 0;
 }
 
 void MessageDeque::foreach(Functor f)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message