qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r584144 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker: Queue.cpp Queue.h
Date Fri, 12 Oct 2007 12:08:40 GMT
Author: gsim
Date: Fri Oct 12 05:08:40 2007
New Revision: 584144

URL: http://svn.apache.org/viewvc?rev=584144&view=rev
Log:
Some fixes to locking within the queue (preventing locks being held during delivery to a consumer)


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=584144&r1=584143&r2=584144&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Oct 12 05:08:40 2007
@@ -139,32 +139,32 @@
     }
 }
 
-bool Queue::dispatch(QueuedMessage& msg){
-
- 
-    RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
+Consumer* Queue::allocate()
+{
+    RWlock::ScopedWlock locker(consumerLock);
  
     if(acquirers.empty()){
-        return false;
+        return 0;
     }else if(exclusive){
-        return exclusive->deliver(msg);
+        return exclusive;
     }else{
-        //deliver to next consumer
         next = next % acquirers.size();
-        Consumer* c = acquirers[next];
-        int start = next;
-        while(c){
-            next++;
-            if(c->deliver(msg)) {
-                return true;            
-            }
-            next = next % acquirers.size();
-            c = next == start ? 0 : acquirers[next];            
-        }
-        return false;
+        return acquirers[next++];
     }
 }
 
+bool Queue::dispatch(QueuedMessage& msg)
+{
+    Consumer* c = allocate();
+    int start = next;
+    while(c){
+        if(c->deliver(msg)) {
+            return true;            
+        }
+        c = next == start ? 0 : allocate();            
+    }
+    return false;
+}
 
 void Queue::dispatch(){
      QueuedMessage msg;
@@ -188,27 +188,22 @@
 
 void Queue::serviceBrowser(Consumer* browser)
 {
-    //This is a poorly performing implementation:
-    //
-    //  * bad concurrency where browsers exist
-    //  * inefficient for largish queues
-    //
-    //The queue needs to be based on a current data structure that
-    //does not invalidate iterators when modified. Subscribers could
-    //then use an iterator to continue from where they left off
+    QueuedMessage msg;
+    while (seek(msg, browser->position) && browser->deliver(msg)) {
+        browser->position = msg.position;
+    }
+}
 
+bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
     Mutex::ScopedLock locker(messageLock);
-    if (!messages.empty() && messages.back().position > browser->position)
{
-        for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
-            if (i->position > browser->position) {
-                if (browser->deliver(*i)) {
-                    browser->position = i->position;
-                } else {
-                    break;
-                }
-            }
-        }
+    if (!messages.empty() && messages.back().position > position) {
+        uint index = (position - messages.front().position) + 1;
+        if (index < messages.size()) {
+            msg = messages[index];
+            return true;
+        } 
     }
+    return false;
 }
 
 void Queue::consume(Consumer* c, bool requestExclusive){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=584144&r1=584143&r2=584144&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Oct 12 05:08:40 2007
@@ -94,6 +94,8 @@
             void dispatch();
             void cancel(Consumer* c, Consumers& set);
             void serviceBrowser(Consumer* c);
+            Consumer* allocate();
+            bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
  
         protected:
 	   /**



Mime
View raw message