qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r960951 - in /qpid/trunk/qpid/cpp/src: qpid/client/amqp0_10/SessionImpl.cpp tests/MessagingThreadTests.cpp
Date Tue, 06 Jul 2010 17:27:58 GMT
Author: gsim
Date: Tue Jul  6 17:27:58 2010
New Revision: 960951

URL: http://svn.apache.org/viewvc?rev=960951&view=rev
Log:
QPID-664: Don't hold lock while waiting for incoming message in nextReceiver() call.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=960951&r1=960950&r2=960951&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Tue Jul  6 17:27:58 2010
@@ -323,11 +323,11 @@ bool SessionImpl::get(ReceiverImpl& rece
 
 bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration
timeout)
 {
-    qpid::sys::Mutex::ScopedLock l(lock);
     while (true) {
         try {
             std::string destination;
             if (incoming.getNextDestination(destination, adjust(timeout))) {
+                qpid::sys::Mutex::ScopedLock l(lock);
                 Receivers::const_iterator i = receivers.find(destination);
                 if (i == receivers.end()) {
                     throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown
destination " << destination));

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp?rev=960951&r1=960950&r2=960951&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp Tue Jul  6 17:27:58 2010
@@ -54,6 +54,25 @@ struct ReceiveThread : public sys::Runna
     }
 };
 
+struct NextReceiverThread : public sys::Runnable {
+    Session session;
+    vector<string> received;
+    string error;
+
+    NextReceiverThread(Session s) : session(s) {}
+    void run() {
+        try {
+            while(true) {
+                Message m = session.nextReceiver(Duration::SECOND*5).fetch();
+                if (m.getContent() == "END") break;
+                received.push_back(m.getContent());
+            }
+        } catch (const std::exception& e) {
+            error = e.what();
+        }
+    }
+};
+
 
 QPID_AUTO_TEST_CASE(testConcurrentSendReceive) {
     MessagingFixture fix;
@@ -103,5 +122,23 @@ QPID_AUTO_TEST_CASE(testCloseSessionBusy
     BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable);
 }
 
+QPID_AUTO_TEST_CASE(testConcurrentSendNextReceiver) {
+    MessagingFixture fix;
+    Receiver r = fix.session.createReceiver("concurrent;{create:always,link:{reliability:unreliable}}");
+    const size_t COUNT=100;
+    r.setCapacity(COUNT);
+    NextReceiverThread rt(fix.session);
+    sys::Thread thread(rt);
+    sys::usleep(1000);          // Give the receive thread time to block.
+    Sender s = fix.session.createSender("concurrent;{create:always}");
+    for (size_t i = 0; i < COUNT; ++i) {
+        s.send(Message());
+    }
+    s.send(Message("END"));
+    thread.join();
+    BOOST_CHECK_EQUAL(rt.error, string());
+    BOOST_CHECK_EQUAL(COUNT, rt.received.size());
+}
+
 QPID_AUTO_TEST_SUITE_END()
 }} // namespace qpid::tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message