qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c...@apache.org
Subject svn commit: r1642681 - in /qpid/trunk/qpid/cpp/src/qpid: broker/QueueCleaner.cpp sys/PollableQueue.h
Date Mon, 01 Dec 2014 13:32:49 GMT
Author: chug
Date: Mon Dec  1 13:32:48 2014
New Revision: 1642681

URL: http://svn.apache.org/r1642681
Log:
QPID-6213: qpidd misses heartbeats

* Pollable queue breaks when client does not process whole batch.
* QueueCleaner must not reschedule same task multiple times.
* QueueCleaner breaks out of batch processing on wall clock time interval.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=1642681&r1=1642680&r2=1642681&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Mon Dec  1 13:32:48 2014
@@ -23,6 +23,7 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/sys/Timer.h"
+#include "qpid/sys/Time.h"
 
 #include <boost/function.hpp>
 #include <boost/bind.hpp>
@@ -73,22 +74,25 @@ void QueueCleaner::setTimer(qpid::sys::T
 
 void QueueCleaner::fired()
 {
+    QPID_LOG(debug, "QueueCleaner::fired: requesting purge");
     queues.eachQueue(boost::bind(&PurgeSet::push, &purging, _1));
-    QPID_LOG(debug, "Requested purge of queues");
+    task->restart(); // Update task restart time to now()+interval
+    timer->add(task);
 }
 
 QueueCleaner::QueuePtrs::const_iterator QueueCleaner::purge(const QueueCleaner::QueuePtrs&
batch)
 {
-    for (QueuePtrs::const_iterator i = batch.begin(); i != batch.end(); ++i) {
-        (*i)->purgeExpired(period);
-    }
-    QPID_LOG(debug, "Purged " << batch.size() << " queues");
-    if (purging.empty()) {
-        task->restart();
-        timer->add(task);
-        QPID_LOG(debug, "Restarted purge timer");
+    const sys::AbsTime tmoTime = sys::AbsTime(sys::AbsTime::now(), 1 * sys::TIME_SEC);
+    int nPurged = 0;
+    QueuePtrs::const_iterator batchItr = batch.begin();
+    for ( ; batchItr != batch.end() && sys::AbsTime::now() < tmoTime; ++batchItr)
{
+        task->restart(); // Update task restart time to now()+interval
+        (*batchItr)->purgeExpired(period);
+        nPurged++;
     }
-    return batch.end();
+    QPID_LOG(debug, "QueueCleaner::purge: purged " << nPurged << " of " <<
batch.size() << " queues");
+    task->restart(); // Update task restart time to now()+interval
+    return batchItr;
 }
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=1642681&r1=1642680&r2=1642681&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Mon Dec  1 13:32:48 2014
@@ -143,7 +143,7 @@ template <class T> void PollableQueue<T>
 
 template <class T> void PollableQueue<T>::process() {
     // Called with lock held
-    while (!stopped && !queue.empty()) {
+    if (!stopped && !queue.empty()) {
         assert(batch.empty());
         batch.swap(queue);
         typename Batch::const_iterator putBack;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message