qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1173697 - in /qpid/branches/qpid-2920-active/qpid/cpp/src: qpid/broker/ qpid/cluster/exp/ tests/
Date Wed, 21 Sep 2011 14:56:21 GMT
Author: aconway
Date: Wed Sep 21 14:56:20 2011
New Revision: 1173697

URL: http://svn.apache.org/viewvc?rev=1173697&view=rev
Log:
QPID-2920: Fixed race condition around Queue::listeners.

- moved call to cluster dequeue, no deferred dequeue.
- removed unused function broker::Cluster::empty

Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h Wed Sep 21 14:56:20
2011
@@ -82,8 +82,9 @@ class Cluster
     virtual void consume(Queue&, size_t consumerCount) = 0;
     /** A consumer cancels its subscription to a queue */
     virtual void cancel(Queue&, size_t consumerCount) = 0;
-    /** A queue becomes empty */
-    virtual void empty(Queue&) = 0;
+
+    // Queues
+
     /** A queue has been stopped */
     virtual void stopped(Queue&) = 0;
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h Wed Sep 21 14:56:20
2011
@@ -52,7 +52,6 @@ class NullCluster : public Cluster
     // Queues
 
     virtual void stopped(Queue&) {}
-    virtual void empty(Queue&) {}
 
     // Wiring
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp Wed Sep 21 14:56:20
2011
@@ -304,14 +304,14 @@ bool Queue::getNextMessage(QueuedMessage
 Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
     while (true) {
+        ClusterAcquireScope acquireScope; // Outside the lock
         Stoppable::Scope consumeScope(consuming);
+        Mutex::ScopedLock locker(messageLock);
         if (!consumeScope) {
             QPID_LOG(trace, "Queue stopped, can't  consume: " << name);
             listeners.addListener(c);
             return NO_MESSAGES;
         }
-        ClusterAcquireScope acquireScope; // Outside the lock
-        Mutex::ScopedLock locker(messageLock);
         if (messages->empty()) {
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
             listeners.addListener(c);
@@ -701,9 +701,6 @@ void Queue::enqueueAborted(boost::intrus
 // return true if store exists,
 bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
 {
-    // FIXME aconway 2011-09-13: new cluster needs tx/dtx support.
-    if (!ctxt && broker) broker->getCluster().dequeue(msg);
-
     ScopedUse u(barrier);
     if (!u.acquired) return false;
     {
@@ -713,6 +710,7 @@ bool Queue::dequeue(TransactionContext* 
             dequeued(msg);
         }
     }
+    if (!ctxt && broker) broker->getCluster().dequeue(msg); // call outside the
lock.
 
     // This check prevents messages which have been forced persistent on one queue from dequeuing
     // from another on which no forcing has taken place and thus causing a store error.
@@ -1291,7 +1289,7 @@ void Queue::startConsumers() {
     notifyListener();
 }
 
-// Called when all busy threads exitd due to stopConsumers()
+// Called when all busy threads exited after stopConsumers()
 void Queue::consumingStopped() {
     QPID_LOG(trace, "Stopped consumers on " << getName());
     if (broker) broker->getCluster().stopped(*this);

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h Wed Sep 21 14:56:20 2011
@@ -21,8 +21,6 @@
  * under the License.
  *
  */
-#include "qpid/log/Statement.h" // FIXME  XXX aconway 2011-06-08: remove
-
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/OwnershipToken.h"
 #include "qpid/broker/Consumer.h"

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp Wed Sep
21 14:56:20 2011
@@ -51,7 +51,7 @@ using namespace broker;
 
 namespace {
 // noReplicate means the current thread is handling a message
-// received from the cluster so it should not be replciated.
+// received from the cluster so it should not be replicated.
 QPID_TSS bool tssNoReplicate = false;
 
 // Routing ID of the message being routed in the current thread.
@@ -111,9 +111,6 @@ void BrokerContext::acquire(const broker
 }
 
 void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
-    // FIXME aconway 2011-09-15: should dequeue locally immediately
-    // instead of waiting for redeliver. No need for CPG order on
-    // dequeues.
     if (!tssNoReplicate)
         core.mcast(ClusterMessageDequeueBody(
                        ProtocolVersion(), qm.queue->getName(), qm.position));
@@ -177,25 +174,19 @@ void BrokerContext::unbind(broker::Queue
 }
 
 // n is the number of consumers including the one just added.
-// FIXME aconway 2011-06-27: rename, conflicting terms. subscribe?
 void BrokerContext::consume(broker::Queue& q, size_t n) {
     QueueContext::get(q)->consume(n);
 }
 
-// FIXME aconway 2011-09-13: rename unsubscribe?
 // n is the number of consumers after the cancel.
 void BrokerContext::cancel(broker::Queue& q, size_t n) {
     QueueContext::get(q)->cancel(n);
 }
 
-void BrokerContext::empty(broker::Queue& ) {
-    // FIXME aconway 2011-06-28: is this needed?
-}
-
 void BrokerContext::stopped(broker::Queue& q) {
     boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
     // Don't forward the stopped call if the queue does not yet have a
-    // cluster context this when the queue is first created locally.
+    // cluster context - this when the queue is first created locally.
     if (qc) qc->stopped();
 }
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h Wed Sep 21
14:56:20 2011
@@ -66,7 +66,6 @@ class BrokerContext : public broker::Clu
     void cancel(broker::Queue&, size_t);
 
     // Queues
-    void empty(broker::Queue&);
     void stopped(broker::Queue&);
 
     // Wiring

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/LockedMap.h?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/LockedMap.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/LockedMap.h Wed Sep 21 14:56:20
2011
@@ -75,6 +75,18 @@ class LockedMap
         return map.erase(key);
     }
 
+    /** Remove and return value associated with key, returns Value() if none. */
+    Value pop(const Key& key) {
+        sys::RWlock::ScopedWlock w(lock);
+        Value value;
+        typename Map::iterator i = map.find(key);
+        if (i != map.end()) {
+            value = i->second;
+            map.erase(i);
+        }
+        return value;
+    }
+
   private:
     typedef std::map<Key, Value> Map;
     Map map;

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp Wed Sep
21 14:56:20 2011
@@ -105,7 +105,8 @@ void MessageHandler::acquire(const std::
         assert(qm.payload);
         // Save on context for possible requeue if released/rejected.
         QueueContext::get(*queue)->acquire(qm);
-        // FIXME aconway 2011-09-19: need to record by member-ID to  requeue if member leaves.
+        // FIXME aconway 2011-09-19: need to record by member-ID to
+        // requeue if member leaves.
     }
 }
 
@@ -118,12 +119,11 @@ void MessageHandler::dequeue(const std::
     // complete the ack that initiated the dequeue at this point, see
     // BrokerContext::dequeue
 
+    // My own dequeues were processed in the connection thread before multicasting.
     if (sender() != self()) {
         boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
-        // Remove fom the unacked list
-        QueueContext::get(*queue)->dequeue(position);
+        QueuedMessage qm = QueueContext::get(*queue)->dequeue(position);
         BrokerContext::ScopedSuppressReplication ssr;
-        QueuedMessage qm = queue->find(position);
         if (qm.queue) queue->dequeue(0, qm);
     }
 }

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp Wed Sep
21 14:56:20 2011
@@ -37,8 +37,7 @@ namespace cluster {
 
 // FIXME aconway 2011-09-16: configurable timeout.
 QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
-    : ownership(UNSUBSCRIBED),
-      timer(boost::bind(&QueueContext::timeout, this),
+    : timer(boost::bind(&QueueContext::timeout, this),
             q.getBroker()->getTimer(),
             100*sys::TIME_MSEC),
       queue(q), mcast(m), consumers(0)
@@ -53,7 +52,8 @@ bool isOwner(QueueOwnership o) { return 
 }
 
 // Called by QueueReplica in CPG deliver thread when state changes.
-void QueueContext::replicaState(QueueOwnership newOwnership) {
+void QueueContext::replicaState(QueueOwnership before, QueueOwnership after) {
+    assert(before != after);
 
     // Invariants for ownership:
     // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped
@@ -61,16 +61,11 @@ void QueueContext::replicaState(QueueOwn
     // SHARED_OWNER <=> timer started, queue started
 
     sys::Mutex::ScopedLock l(lock);
-    QueueOwnership before = ownership;
-    QueueOwnership after = newOwnership;
-    assert(before != after);
-    ownership = newOwnership;
-
     if (!isOwner(before) && isOwner(after)) { // Took ownership
         queue.startConsumers();
         if (after == SHARED_OWNER) timer.start();
     }
-    else if (isOwner(before) && isOwner(after) && before != after) {
+    else if (isOwner(before) && isOwner(after)) {
         // Changed from shared to sole owner or vice versa
         if (after == SOLE_OWNER) timer.stop();
         else timer.start();
@@ -78,7 +73,8 @@ void QueueContext::replicaState(QueueOwn
     // If we lost ownership then the queue and timer will already have
     // been stopped by timeout()
 }
-// FIXME aconway 2011-07-27: Dont spin the token on an empty or idle queue.
+
+// FIXME aconway 2011-07-27: Dont spin the token on an empty queue.
 
 // Called in connection threads when a consumer is added
 void QueueContext::consume(size_t n) {
@@ -95,13 +91,12 @@ void QueueContext::cancel(size_t n) {
     // When consuming threads are stopped, this->stopped will be called.
     if (n == 0) {
         timer.stop();
-        queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock?
+        queue.stopConsumers();
     }
 }
 
 // Called in timer thread.
 void QueueContext::timeout() {
-    QPID_LOG(debug, "FIXME QueueContext::timeout");
     // When all threads have stopped, queue will call stopped()
     queue.stopConsumers();
 }
@@ -109,7 +104,6 @@ void QueueContext::timeout() {
 // Callback set up by queue.stopConsumers() called in connection thread.
 // Called when no threads are dispatching from the queue.
 void QueueContext::stopped() {
-    QPID_LOG(debug, "FIXME QueueContext::stopped");
     sys::Mutex::ScopedLock l(lock);
     if (consumers == 0)
         mcast.mcast(framing::ClusterQueueUnsubscribeBody(
@@ -134,8 +128,8 @@ void QueueContext::acquire(const broker:
     unacked.put(qm.position, qm);
 }
 
-void QueueContext::dequeue(uint32_t position) {
-    unacked.erase(position);
+broker::QueuedMessage QueueContext::dequeue(uint32_t position) {
+    return unacked.pop(position);
 }
 
 boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h Wed Sep 21
14:56:20 2011
@@ -55,7 +55,7 @@ class QueueContext : public RefCounted {
     ~QueueContext();
 
     /** Replica state has changed, called in deliver thread. */
-    void replicaState(QueueOwnership);
+    void replicaState(QueueOwnership before, QueueOwnership after);
 
     /** Called when queue is stopped, no threads are dispatching.
      * May be called in connection or deliver thread.
@@ -85,11 +85,10 @@ class QueueContext : public RefCounted {
     void acquire(const broker::QueuedMessage& qm);
 
     /** Called by MesageHandler when a message is dequeued. */
-    void dequeue(uint32_t position);
+    broker::QueuedMessage dequeue(uint32_t position);
 
   private:
     sys::Mutex lock;
-    QueueOwnership ownership;
     CountdownTimer timer;
     broker::Queue& queue;       // FIXME aconway 2011-06-08: should be shared/weak ptr?
     Multicaster& mcast;

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp Wed Sep
21 14:56:20 2011
@@ -56,6 +56,7 @@ void QueueReplica::subscribe(const Membe
     update(before);
 }
 
+// FIXME aconway 2011-09-20: need to requeue.
 void QueueReplica::unsubscribe(const MemberId& member) {
     QueueOwnership before = getState();
     MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
@@ -76,9 +77,11 @@ void QueueReplica::resubscribe(const Mem
 
 void QueueReplica::update(QueueOwnership before) {
     QueueOwnership after = getState();
-    QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ": "
-             << before << "->" << after << " [" << PrintSubscribers(subscribers,
self) << "]");
-    if (before != after) context->replicaState(after);
+    if (before != after) {
+        QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ":
"
+                 << before << "->" << after << " [" << PrintSubscribers(subscribers,
self) << "]");
+        context->replicaState(before, after);
+    }
 }
 
 QueueOwnership QueueReplica::getState() const {
@@ -93,7 +96,6 @@ bool QueueReplica::isOwner() const {
 }
 
 bool QueueReplica::isSubscriber(const MemberId& member) const {
-    // FIXME aconway 2011-06-27: linear search here, is it a performance issue?
     return std::find(subscribers.begin(), subscribers.end(), member) != subscribers.end();
 }
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp?rev=1173697&r1=1173696&r2=1173697&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp Wed Sep 21 14:56:20
2011
@@ -108,8 +108,6 @@ class DummyCluster : public broker::Clus
     }
 
     // Queues
-    // FIXME aconway 2011-05-18: update test to exercise empty()
-    virtual void empty(broker::Queue& q) { recordStr("empty", q.getName()); }
     virtual void stopped(broker::Queue& q) { recordStr("stopped", q.getName()); }
 
     // Wiring
@@ -249,9 +247,6 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
     BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
     BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
-    // FIXME aconway 2011-07-25: empty called once per receiver?
-    BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
-    BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
     BOOST_CHECK_EQUAL(h.size(), i);
 
     // Message replaced on LVQ



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message