qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r683617 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker: Queue.cpp Queue.h SemanticState.cpp
Date Thu, 07 Aug 2008 13:45:24 GMT
Author: aconway
Date: Thu Aug  7 06:45:24 2008
New Revision: 683617

URL: http://svn.apache.org/viewvc?rev=683617&view=rev
Log:
Patch from Gordon Sim to fix issues with hasOutput implementation.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=683617&r1=683616&r2=683617&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Aug  7 06:45:24 2008
@@ -212,9 +212,30 @@
     }
 }
 
-bool Queue::empty() const {
+bool Queue::checkForMessages(Consumer& c)
+{
     Mutex::ScopedLock locker(messageLock);
-    return messages.empty();
+    if (messages.empty()) {
+        //no message available, register consumer for notification
+        //when this changes
+        addListener(c);
+        return false;
+    } else {
+        QueuedMessage msg = messages.front();
+        if (store && !msg.payload->isEnqueueComplete()) {
+            //though a message is on the queue, it has not yet been
+            //enqueued and so is not available for consumption yet,
+            //register consumer for notification when this changes
+            addListener(c);
+            return false;            
+        } else {
+            //check that consumer has sufficient credit for the
+            //message (if it does not, no need to register it for
+            //notification as the consumer itself will handle the
+            //credit allocation required to change this condition).
+            return c.accept(msg.payload);
+        }
+    }
 }
 
 bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=683617&r1=683616&r2=683617&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Aug  7 06:45:24 2008
@@ -107,6 +107,7 @@
 
             void notify();
             void removeListener(Consumer&);
+            void addListener(Consumer&);
 
             bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
@@ -114,8 +115,6 @@
             void popAndDequeue();
 
         public:
-            // FIXME aconway 2008-08-06: was private, verify if needed public.
-            void addListener(Consumer&);
 
             virtual void notifyDurableIOComplete();
             typedef boost::shared_ptr<Queue> shared_ptr;
@@ -128,9 +127,14 @@
                   management::Manageable* parent = 0);
             ~Queue();
 
-            bool empty() const;
-            
             bool dispatch(Consumer&);
+            /**
+             * Check whether there would be a message available for
+             * dispatch to this consumer. If not, the consumer will be
+             * notified of events that may have changed this
+             * situation.
+             */
+            bool checkForMessages(Consumer&);
 
             void create(const qpid::framing::FieldTable& settings);
             void configure(const qpid::framing::FieldTable& settings);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=683617&r1=683616&r2=683617&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Aug  7 06:45:24 2008
@@ -591,8 +591,7 @@
 }
 
 bool SemanticState::ConsumerImpl::hasOutput() {
-    queue->addListener(*this);
-    return !queue->empty();
+    return queue->checkForMessages(*this);
 }
 
 bool SemanticState::ConsumerImpl::doOutput()



Mime
View raw message