qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r584172 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker: Queue.cpp Queue.h SemanticState.cpp SemanticState.h
Date Fri, 12 Oct 2007 14:52:38 GMT
Author: gsim
Date: Fri Oct 12 07:52:36 2007
New Revision: 584172

URL: http://svn.apache.org/viewvc?rev=584172&view=rev
Log:
Further fixes to locking between queue and semantic state to avoid deadlocking.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=584172&r1=584171&r2=584172&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Oct 12 07:52:36 2007
@@ -124,21 +124,20 @@
     return false;
 }
 
-void Queue::requestDispatch(Consumer* c, bool sync){
+void Queue::requestDispatch(Consumer* c){
     if (!c || c->preAcquires()) {
-        if (sync) {
-	    Mutex::ScopedLock locker(messageLock);
-            dispatch();
-        } else {
-            serializer.execute(dispatchCallback);
-        }
+        serializer.execute(dispatchCallback);
     } else {
-        //note: this is always done on the callers thread, regardless
-        //      of sync; browsers of large queues should use flow control!
         serviceBrowser(c);
     }
 }
 
+void Queue::flush(DispatchCompletion& completion)
+{
+    DispatchFunctor f(*this, &completion);
+    serializer.execute(f);
+}
+
 Consumer* Queue::allocate()
 {
     RWlock::ScopedWlock locker(consumerLock);
@@ -179,9 +178,18 @@
         } else {            
             break;
         }	
-     }    
-     RWlock::ScopedRlock locker(consumerLock);
-     for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) {         
+     }
+     serviceAllBrowsers();
+}
+
+void Queue::serviceAllBrowsers()
+{
+     Consumers copy;
+     {
+         RWlock::ScopedRlock locker(consumerLock);
+         copy = browsers;
+     }
+     for (Consumers::iterator i = copy.begin(); i != copy.end(); i++) {
          serviceBrowser(*i);
      }
 }
@@ -428,3 +436,4 @@
 {
     return alternateExchange;
 }
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=584172&r1=584171&r2=584172&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Oct 12 07:52:36 2007
@@ -48,20 +48,32 @@
 
         using std::string;
 
+        struct DispatchCompletion 
+        {
+            virtual ~DispatchCompletion() {}
+            virtual void completed() = 0;
+        };
+
         /**
          * The brokers representation of an amqp queue. Messages are
          * delivered to a queue from where they can be dispatched to
          * registered consumers or be stored until dequeued or until one
          * or more consumers registers.
          */
-        class Queue : public PersistableQueue{
+        class Queue : public PersistableQueue {
             typedef std::vector<Consumer*> Consumers;
             typedef std::deque<QueuedMessage> Messages;
             
-            struct DispatchFunctor {
+            struct DispatchFunctor 
+            {
                 Queue& queue;
-                DispatchFunctor(Queue& q) : queue(q) {}
-                void operator()() { queue.dispatch(); }
+                DispatchCompletion* sync;
+                DispatchFunctor(Queue& q, DispatchCompletion* s = 0) : queue(q), sync(s)
{}
+                void operator()()
+                {
+                    queue.dispatch(); 
+                    if (sync) sync->completed();
+                }
             };
                 
             const string name;
@@ -93,6 +105,7 @@
 	     */
             void dispatch();
             void cancel(Consumer* c, Consumers& set);
+            void serviceAllBrowsers();
             void serviceBrowser(Consumer* c);
             Consumer* allocate();
             bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
@@ -149,7 +162,8 @@
              * at any time, so this call schedules the despatch based on
 	     * the serilizer policy.
              */
-            void requestDispatch(Consumer* c = 0, bool sync = false);
+            void requestDispatch(Consumer* c = 0);
+            void flush(DispatchCompletion& callback);
             void consume(Consumer* c, bool exclusive = false);
             void cancel(Consumer* c);
             uint32_t purge();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=584172&r1=584171&r2=584172&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 12 07:52:36 2007
@@ -346,38 +346,40 @@
 
 void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
 {
-    Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent
delivery
-    
-    ack_iterator start = cumulative ? unacked.begin() : 
-        find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter),
first));
-    ack_iterator end = start;
-     
-    if (cumulative || first != last) {
-        //need to find end (position it just after the last record in range)
-        end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after),
last));
-    } else {
-        //just acked single element (move end past it)
-        ++end;
-    }
-
-    for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
-    
-    if (txBuffer.get()) {
-        //in transactional mode, don't dequeue or remove, just
-        //maintain set of acknowledged messages:
-        accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+    {
+        Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent
delivery
         
-        if (dtxBuffer.get()) {
-            //if enlisted in a dtx, remove the relevant slice from
-            //unacked and record it against that transaction
-            TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
-            accumulatedAck.clear();
-            dtxBuffer->enlist(txAck);    
+        ack_iterator start = cumulative ? unacked.begin() : 
+            find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter),
first));
+        ack_iterator end = start;
+        
+        if (cumulative || first != last) {
+            //need to find end (position it just after the last record in range)
+            end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after),
last));
+        } else {
+            //just acked single element (move end past it)
+            ++end;
         }
-    } else {
-        for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
-        unacked.erase(start, end);
-    }
+        
+        for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
+        
+        if (txBuffer.get()) {
+            //in transactional mode, don't dequeue or remove, just
+            //maintain set of acknowledged messages:
+            accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+            
+            if (dtxBuffer.get()) {
+                //if enlisted in a dtx, remove the relevant slice from
+                //unacked and record it against that transaction
+                TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+                accumulatedAck.clear();
+                dtxBuffer->enlist(txAck);    
+            }
+        } else {
+            for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+            unacked.erase(start, end);
+        }
+    }//end of lock scope for delivery lock (TODO this is ugly, make it prettier)
     
     //if the prefetch limit had previously been reached, or credit
     //had expired in windowing mode there may be messages that can
@@ -525,12 +527,10 @@
 void SemanticState::ConsumerImpl::flush()
 {
     //need to prevent delivery after requestDispatch returns but
-    //before credit is reduced to zero; TODO: come up with better
-    //implementation of flush.
-    Mutex::ScopedLock l(lock);
-    queue->requestDispatch(this, true);
-    byteCredit = 0;
-    msgCredit = 0;
+    //before credit is reduced to zero
+    FlushCompletion completion(*this);
+    queue->flush(completion);
+    completion.wait();
 }
 
 void SemanticState::ConsumerImpl::stop()
@@ -597,6 +597,21 @@
     for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
     //need to remove the delivery records as well
     unacked.erase(range.start, range.end);
+}
+
+
+void SemanticState::FlushCompletion::wait()
+{
+    Monitor::ScopedLock locker(lock);
+    while (!complete) lock.wait();
+}
+
+void SemanticState::FlushCompletion::completed()
+{
+    Monitor::ScopedLock locker(lock);
+    consumer.stop();
+    complete = true;
+    lock.notifyAll();
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=584172&r1=584171&r2=584172&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Oct 12 07:52:36 2007
@@ -89,6 +89,17 @@
         void acknowledged(const DeliveryRecord&);    
     };
 
+    struct FlushCompletion : DispatchCompletion
+    {
+        sys::Monitor lock;
+        ConsumerImpl& consumer;
+        bool complete;
+        
+        FlushCompletion(ConsumerImpl& c) : consumer(c), complete(false) {}
+        void wait();
+        void completed();
+    };
+
     typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
 
     SessionState& session;



Mime
View raw message