Return-Path: Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: (qmail 65676 invoked from network); 6 Jul 2010 17:29:25 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 6 Jul 2010 17:29:25 -0000 Received: (qmail 82029 invoked by uid 500); 6 Jul 2010 17:29:25 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 81979 invoked by uid 500); 6 Jul 2010 17:29:24 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 81971 invoked by uid 99); 6 Jul 2010 17:29:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Jul 2010 17:29:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Jul 2010 17:29:21 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 959E82388906; Tue, 6 Jul 2010 17:27:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r960951 - in /qpid/trunk/qpid/cpp/src: qpid/client/amqp0_10/SessionImpl.cpp tests/MessagingThreadTests.cpp Date: Tue, 06 Jul 2010 17:27:58 -0000 To: commits@qpid.apache.org From: gsim@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100706172758.959E82388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gsim Date: Tue Jul 6 17:27:58 2010 New Revision: 960951 URL: http://svn.apache.org/viewvc?rev=960951&view=rev Log: QPID-664: Don't hold lock while waiting for incoming message in nextReceiver() call. Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=960951&r1=960950&r2=960951&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Tue Jul 6 17:27:58 2010 @@ -323,11 +323,11 @@ bool SessionImpl::get(ReceiverImpl& rece bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout) { - qpid::sys::Mutex::ScopedLock l(lock); while (true) { try { std::string destination; if (incoming.getNextDestination(destination, adjust(timeout))) { + qpid::sys::Mutex::ScopedLock l(lock); Receivers::const_iterator i = receivers.find(destination); if (i == receivers.end()) { throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown destination " << destination)); Modified: qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp?rev=960951&r1=960950&r2=960951&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp Tue Jul 6 17:27:58 2010 @@ -54,6 +54,25 @@ struct ReceiveThread : public sys::Runna } }; +struct NextReceiverThread : public sys::Runnable { + Session session; + vector received; + string error; + + NextReceiverThread(Session s) : session(s) {} + void run() { + try { + while(true) { + Message m = session.nextReceiver(Duration::SECOND*5).fetch(); + if (m.getContent() == "END") break; + received.push_back(m.getContent()); + } + } catch (const std::exception& e) { + error = e.what(); + } + } +}; + QPID_AUTO_TEST_CASE(testConcurrentSendReceive) { MessagingFixture fix; @@ -103,5 +122,23 @@ QPID_AUTO_TEST_CASE(testCloseSessionBusy BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable); } +QPID_AUTO_TEST_CASE(testConcurrentSendNextReceiver) { + MessagingFixture fix; + Receiver r = fix.session.createReceiver("concurrent;{create:always,link:{reliability:unreliable}}"); + const size_t COUNT=100; + r.setCapacity(COUNT); + NextReceiverThread rt(fix.session); + sys::Thread thread(rt); + sys::usleep(1000); // Give the receive thread time to block. + Sender s = fix.session.createSender("concurrent;{create:always}"); + for (size_t i = 0; i < COUNT; ++i) { + s.send(Message()); + } + s.send(Message("END")); + thread.join(); + BOOST_CHECK_EQUAL(rt.error, string()); + BOOST_CHECK_EQUAL(COUNT, rt.received.size()); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org