qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1477975 - in /qpid/trunk/qpid/cpp/src: qpid/client/amqp0_10/IncomingMessages.cpp qpid/client/amqp0_10/IncomingMessages.h tests/MessagingSessionTests.cpp
Date Wed, 01 May 2013 12:36:05 GMT
Author: gsim
Date: Wed May  1 12:36:05 2013
New Revision: 1477975

URL: http://svn.apache.org/r1477975
Log:
QPID-4786: Only have one thread processing session queue at a time

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1477975&r1=1477974&r2=1477975&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Wed May  1 12:36:05
2013
@@ -100,8 +100,24 @@ struct Match
         }
     }
 };
+
+struct ScopedRelease
+{
+    bool& flag;
+    qpid::sys::Monitor& lock;
+
+    ScopedRelease(bool& f, qpid::sys::Monitor& l) : flag(f), lock(l) {}
+    ~ScopedRelease()
+    {
+        sys::Monitor::ScopedLock l(lock);
+        flag = false;
+        lock.notifyAll();
+    }
+};
 }
 
+IncomingMessages::IncomingMessages() : inUse(false) {}
+
 void IncomingMessages::setSession(qpid::client::AsyncSession s)
 {
     sys::Mutex::ScopedLock l(lock);
@@ -110,10 +126,11 @@ void IncomingMessages::setSession(qpid::
     acceptTracker.reset();
 }
 
-bool IncomingMessages::get(Handler& handler, Duration timeout)
+bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout)
 {
-    {
-        sys::Mutex::ScopedLock l(lock);
+    sys::Mutex::ScopedLock l(lock);
+    AbsTime deadline(AbsTime::now(), timeout);
+    do {
         //search through received list for any transfer of interest:
         for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
         {
@@ -123,19 +140,42 @@ bool IncomingMessages::get(Handler& hand
                 return true;
             }
         }
-    }
-    //none found, check incoming:
-    return process(&handler, timeout);
+        if (inUse) {
+            //someone is already waiting on the incoming session queue, wait for them to
finish
+            lock.wait(deadline);
+        } else {
+            inUse = true;
+            ScopedRelease release(inUse, lock);
+            sys::Mutex::ScopedUnlock l(lock);
+            //wait for suitable new message to arrive
+            return process(&handler, timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE
: qpid::sys::Duration(AbsTime::now(), deadline));
+        }
+    } while (AbsTime::now() < deadline);
+    return false;
 }
 
-bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout)
+bool IncomingMessages::getNextDestination(std::string& destination, qpid::sys::Duration
timeout)
 {
     sys::Mutex::ScopedLock l(lock);
-    //if there is not already a received message, we must wait for one
-    if (received.empty() && !wait(timeout)) return false;
-    //else we have a message in received; return the corresponding destination
-    destination = received.front()->as<MessageTransferBody>()->getDestination();
-    return true;
+    AbsTime deadline(AbsTime::now(), timeout);
+    while (received.empty() && AbsTime::now() < deadline) {
+        if (inUse) {
+            //someone is already waiting on the sessions incoming queue
+            lock.wait(deadline);
+        } else {
+            inUse = true;
+            ScopedRelease release(inUse, lock);
+            sys::Mutex::ScopedUnlock l(lock);
+            //wait for an incoming message
+            wait(timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(),
deadline));
+        }
+    }
+    if (!received.empty()) {
+        destination = received.front()->as<MessageTransferBody>()->getDestination();
+        return true;
+    } else {
+        return false;
+    }
 }
 
 void IncomingMessages::accept()
@@ -206,6 +246,7 @@ bool IncomingMessages::process(Handler* 
                     QPID_LOG(debug, "Pushed " << *content->getMethod() <<
" to received queue");
                     sys::Mutex::ScopedLock l(lock);
                     received.push_back(content);
+                    lock.notifyAll();
                 }
             } else {
                 //TODO: handle other types of commands (e.g. message-accept, message-flow
etc)
@@ -225,6 +266,7 @@ bool IncomingMessages::wait(qpid::sys::D
             QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received
queue");
             sys::Mutex::ScopedLock l(lock);
             received.push_back(content);
+            lock.notifyAll();
             return true;
         } else {
             //TODO: handle other types of commands (e.g. message-accept, message-flow etc)

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1477975&r1=1477974&r2=1477975&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Wed May  1 12:36:05 2013
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -68,6 +68,7 @@ class IncomingMessages
         virtual bool accept(MessageTransfer& transfer) = 0;
     };
 
+    IncomingMessages();
     void setSession(qpid::client::AsyncSession session);
     bool get(Handler& handler, qpid::sys::Duration timeout);
     bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
@@ -84,9 +85,10 @@ class IncomingMessages
   private:
     typedef std::deque<FrameSetPtr> FrameSetQueue;
 
-    sys::Mutex lock;
+    sys::Monitor lock;
     qpid::client::AsyncSession session;
     boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
+    bool inUse;
     FrameSetQueue received;
     AcceptTracker acceptTracker;
 

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1477975&r1=1477974&r2=1477975&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Wed May  1 12:36:05 2013
@@ -1217,6 +1217,43 @@ QPID_AUTO_TEST_CASE(testLinkBindingClean
     BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE));
 }
 
+namespace {
+struct Fetcher : public qpid::sys::Runnable {
+    Receiver receiver;
+    Message message;
+    bool result;
+
+    Fetcher(Receiver r) : receiver(r), result(false) {}
+    void run()
+    {
+        result = receiver.fetch(message, Duration::SECOND*10);
+    }
+};
+}
+
+QPID_AUTO_TEST_CASE(testConcurrentFetch)
+{
+    MessagingFixture fix;
+    Sender sender = fix.session.createSender("my-test-queue;{create:always, node : { x-declare
: { auto-delete: true}}}");
+    Receiver receiver = fix.session.createReceiver("my-test-queue");
+    Fetcher fetcher(fix.session.createReceiver("amq.fanout"));
+    qpid::sys::Thread runner(fetcher);
+    Message out("test-message");
+    for (int i = 0; i < 10; i++) {//try several times to make sure
+        sender.send(out, true);
+        //since the message is now on the queue, it should take less than the timeout to
actually fetch it
+        qpid::sys::AbsTime start = qpid::sys::AbsTime::now();
+        Message in;
+        BOOST_CHECK(receiver.fetch(in, qpid::messaging::Duration::SECOND*2));
+        qpid::sys::Duration time(start, qpid::sys::AbsTime::now());
+        BOOST_CHECK(time < qpid::sys::TIME_SEC*2);
+        if (time >= qpid::sys::TIME_SEC*2) break;//if we failed, no need to keep testing
+    }
+    fix.session.createSender("amq.fanout").send(out);
+    runner.join();
+    BOOST_CHECK(fetcher.result);
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message