qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r721256 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/IncompleteMessageList.cpp qpid/broker/IncompleteMessageList.h tests/IncompleteMessageList.cpp
Date Thu, 27 Nov 2008 18:43:01 GMT
Author: gsim
Date: Thu Nov 27 10:43:00 2008
New Revision: 721256

URL: http://svn.apache.org/viewvc?rev=721256&view=rev
Log:
Ensure broker doesn't hang waiting for async enqueue to complete on shutdown.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
    incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=721256&r1=721255&r2=721256&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Thu Nov 27 10:43:00
2008
@@ -25,13 +25,20 @@
 namespace broker {
 
 IncompleteMessageList::IncompleteMessageList() :
-    callback(boost::bind(&IncompleteMessageList::enqueueComplete, this, _1))
+    callback(boost::bind(&IncompleteMessageList::enqueueComplete, this, _1)), closed(false)
 {}
 
 IncompleteMessageList::~IncompleteMessageList() 
 {
+    close();
+}
+
+void IncompleteMessageList::close() 
+{
     sys::Mutex::ScopedLock l(lock);
+    closed = true;
     std::for_each(incomplete.begin(), incomplete.end(), boost::bind(&Message::resetEnqueueCompleteCallback,
_1));
+    lock.notify();
 }
 
 void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg)
@@ -57,8 +64,9 @@
                     sys::Mutex::ScopedUnlock u(lock);
                     msg->flush(); // Can re-enter IncompleteMessageList::enqueueComplete
                 }
-                while (!msg->isEnqueueComplete())
+                while (!msg->isEnqueueComplete() && !closed)
                     lock.wait();
+                if (closed) return;
             } else {
                 //leave the message as incomplete for now
                 return;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h?rev=721256&r1=721255&r2=721256&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h Thu Nov 27 10:43:00
2008
@@ -39,6 +39,7 @@
     sys::Monitor lock;
     Messages incomplete;
     Message::MessageCallback callback;
+    bool closed;
 
 public:
     typedef Message::MessageCallback CompletionListener;    
@@ -46,6 +47,7 @@
     IncompleteMessageList();
     ~IncompleteMessageList();
     
+    void close();
     void add(boost::intrusive_ptr<Message> msg);
     void process(const CompletionListener& l, bool sync);
     void each(const CompletionListener& l);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp?rev=721256&r1=721255&r2=721256&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp Thu Nov 27 10:43:00
2008
@@ -24,6 +24,8 @@
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/IncompleteMessageList.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
 
 #include "unit_test.h"
 
@@ -93,7 +95,6 @@
     list.process(Checker(3, 5), false);    
 }
 
-
 struct MockStore : public NullMessageStore
 {
     Queue::shared_ptr queue;
@@ -125,4 +126,38 @@
     list.process(Checker(1, 5), true);
 }
 
+struct AsyncProcessor : qpid::sys::Runnable
+{
+    Checker checker;
+    IncompleteMessageList& list;
+
+    AsyncProcessor(uint start, uint end, IncompleteMessageList& list_) : checker(start,
end), list(list_) {}
+    
+    void run() 
+    {
+        list.process(checker, true);
+    }
+};
+
+QPID_AUTO_TEST_CASE(testSyncProcessInterruptedOnClose)
+{
+    IncompleteMessageList list;
+    SequenceNumber counter(1);
+    NullMessageStore store;
+    Queue::shared_ptr queue(new Queue("mock-queue"));
+    //fill up list with messages
+    for (int i = 0; i < 5; i++) {
+        boost::intrusive_ptr<Message> msg(new Message(counter++));
+        list.add(msg);
+        if (i == 2) {
+            //mark a message in the middle as incomplete
+            msg->enqueueAsync(queue, &store);
+        }
+    }
+    AsyncProcessor ap(1, 2, list);
+    qpid::sys::Thread thread(ap);
+    list.close();
+    thread.join();
+}
+
 QPID_AUTO_TEST_SUITE_END()



Mime
View raw message