qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1165886 - in /qpid/branches/qpid-2920-1/qpid/cpp: src/ src/qpid/broker/ src/qpid/cluster/exp/ src/qpid/sys/ src/tests/ xml/
Date Tue, 06 Sep 2011 21:47:15 GMT
Author: aconway
Date: Tue Sep  6 21:47:14 2011
New Revision: 1165886

URL: http://svn.apache.org/viewvc?rev=1165886&view=rev
Log:
QPID-2920: Broken checkpoint: passing dequeue mutex test with issues

- handler/context/replica convention (see overview.h doc notes)
- rename BrokerHandler to BrokerContext
- notify Cluster (BrokerContext) on queue stopped or empty (need empty?)
- Implementing Stoppable & stoppable scopes in Queue.cpp
- Move queue ownership logic from dequeue to acquire

Releasing on message count will not work, switch to timer based release.

Added:
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp   (contents, props changed)
      - copied, changed from r1165884, qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h   (contents, props changed)
      - copied, changed from r1165884, qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp   (with props)
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h   (with props)
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp   (with props)
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h   (with props)
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp   (with props)
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h   (with props)
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/overview.h   (with props)
Modified:
    qpid/branches/qpid-2920-1/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-2920-1/qpid/cpp/src/cluster.mk
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Cluster.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/NullCluster.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/tests/BrokerClusterCalls.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/tests/cluster2_tests.py
    qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-test-cluster
    qpid/branches/qpid-2920-1/qpid/cpp/xml/cluster.xml

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/Makefile.am?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/Makefile.am Tue Sep  6 21:47:14 2011
@@ -453,6 +453,7 @@ libqpidcommon_la_SOURCES +=			\
   qpid/sys/AtomicValue_gcc.h			\
   qpid/sys/AtomicValue_mutex.h			\
   qpid/sys/BlockingQueue.h			\
+  qpid/sys/BusyThreads.h			\
   qpid/sys/ClusterSafe.h    			\
   qpid/sys/ClusterSafe.cpp  			\
   qpid/sys/Codec.h				\

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/cluster.mk?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/cluster.mk (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/cluster.mk Tue Sep  6 21:47:14 2011
@@ -110,8 +110,8 @@ cluster2_la_SOURCES =				\
 	qpid/cluster/Cpg.h			\
 	qpid/cluster/PollerDispatch.cpp		\
 	qpid/cluster/PollerDispatch.h		\
-	qpid/cluster/exp/BrokerHandler.cpp	\
-	qpid/cluster/exp/BrokerHandler.h	\
+	qpid/cluster/exp/BrokerContext.cpp	\
+	qpid/cluster/exp/BrokerContext.h	\
 	qpid/cluster/exp/BufferFactory.h	\
 	qpid/cluster/exp/Cluster2Plugin.cpp	\
 	qpid/cluster/exp/Core.cpp		\
@@ -124,6 +124,12 @@ cluster2_la_SOURCES =				\
 	qpid/cluster/exp/MessageHandler.h	\
 	qpid/cluster/exp/Multicaster.cpp	\
 	qpid/cluster/exp/Multicaster.h		\
+	qpid/cluster/exp/QueueContext.cpp	\
+	qpid/cluster/exp/QueueContext.h		\
+	qpid/cluster/exp/QueueHandler.cpp	\
+	qpid/cluster/exp/QueueHandler.h		\
+	qpid/cluster/exp/QueueReplica.cpp	\
+	qpid/cluster/exp/QueueReplica.h		\
 	qpid/cluster/exp/WiringHandler.cpp	\
 	qpid/cluster/exp/WiringHandler.h
 

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Cluster.h?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Cluster.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Cluster.h Tue Sep  6 21:47:14 2011
@@ -80,6 +80,10 @@ class Cluster
     virtual void consume(Queue&, size_t consumerCount) = 0;
     /** A consumer cancels its subscription to a queue */
     virtual void cancel(Queue&, size_t consumerCount) = 0;
+    /** A queue becomes empty */
+    virtual void empty(Queue&) = 0;
+    /** A queue has been stopped */
+    virtual void stopped(Queue&) = 0;
 
     // Wiring
 

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/NullCluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/NullCluster.h?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/NullCluster.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/NullCluster.h Tue Sep  6 21:47:14 2011
@@ -49,6 +49,11 @@ class NullCluster : public Cluster
     virtual void consume(Queue&, size_t) {}
     virtual void cancel(Queue&, size_t) {}
 
+    // Queues
+
+    virtual void stopped(Queue&) {}
+    virtual void empty(Queue&) {}
+
     // Wiring
 
     virtual void create(Queue&) {}
@@ -59,6 +64,7 @@ class NullCluster : public Cluster
                       const std::string&, const framing::FieldTable&) {}
     virtual void unbind(Queue&, Exchange&,
                         const std::string&, const framing::FieldTable&) {}
+
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep  6 21:47:14 2011
@@ -112,7 +112,8 @@ Queue::Queue(const string& _name, bool _
     broker(b),
     deleted(false),
     barrier(*this),
-    autoDeleteTimeout(0)
+    autoDeleteTimeout(0),
+    dispatching(boost::bind(&Queue::acquireStopped,this))
 {
     if (parent != 0 && broker != 0) {
         ManagementAgent* agent = broker->getManagementAgent();
@@ -231,29 +232,40 @@ void Queue::requeue(const QueuedMessage&
     copy.notify();
 }
 
-// Inform the cluster of an acquired message on exit from a function
-// that does the acquiring. ClusterAcquireOnExit is declared *before*
-// any locks are taken. The calling function sets qmsg to the acquired
-// message with a lock held, but the call to Cluster::acquire() will
-// be outside the lock.
-struct ClusterAcquireOnExit {
+/** Mark a scope that acquires a message.
+ *
+ * ClusterAcquireScope is declared before are taken.  The calling
+ * function sets qmsg with the lock held, but the call to
+ * Cluster::acquire() will happen after the lock is released.
+ *
+ * Also marks a Stoppable as busy for the duration of the scope.
+ **/
+struct ClusterAcquireScope {
     Broker* broker;
+    Queue& queue;
     QueuedMessage qmsg;
-    ClusterAcquireOnExit(Broker* b) : broker(b) {}
-    ~ClusterAcquireOnExit() {
-        if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
+
+    ClusterAcquireScope(Queue& q) : broker(q.getBroker()), queue(q) {}
+
+    ~ClusterAcquireScope() {
+        if (broker) {
+            // FIXME aconway 2011-06-27:  Move to QueueContext.
+            // Avoid the indirection via queuename.
+            if (qmsg.queue) broker->getCluster().acquire(qmsg);
+            else broker->getCluster().empty(queue);
+        }
     }
 };
 
 bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
 {
-    ClusterAcquireOnExit willAcquire(broker); // Outside lock
+    ClusterAcquireScope acquireScope(*this); // Outside lock
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
     if (messages->remove(position, message)) {
         QPID_LOG(debug, "Acquired message at " << position << " from " << name);
-        willAcquire.qmsg = message;
+        acquireScope.qmsg = message;
         return true;
     } else {
         QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
@@ -300,9 +312,15 @@ bool Queue::getNextMessage(QueuedMessage
 Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
     while (true) {
-        ClusterAcquireOnExit willAcquire(broker); // Outside the lock
+        Stoppable::Scope stopper(dispatching); // FIXME aconway 2011-06-28: rename consuming
+        if (!stopper) {
+            QPID_LOG(trace, "Queue is stopped: " << name);
+            listeners.addListener(c);
+            return NO_MESSAGES;
+        }
+        ClusterAcquireScope acquireScope(*this); // Outside the lock
         Mutex::ScopedLock locker(messageLock);
-        if (messages->empty()) {
+        if (messages->empty()) { // FIXME aconway 2011-06-07: ugly
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
             listeners.addListener(c);
             return NO_MESSAGES;
@@ -317,7 +335,7 @@ Queue::ConsumeCode Queue::consumeNextMes
             if (c->filter(msg.payload)) {
                 if (c->accept(msg.payload)) {
                     m = msg;
-                    willAcquire.qmsg = msg;
+                    acquireScope.qmsg = msg;
                     pop();
                     return CONSUMED;
                 } else {
@@ -374,18 +392,11 @@ void Queue::removeListener(Consumer::sha
 
 bool Queue::dispatch(Consumer::shared_ptr c)
 {
-    Stoppable::Scope doDispatch(dispatching);
-    if (doDispatch) {
-        QueuedMessage msg(this);
-        if (getNextMessage(msg, c)) {
-            c->deliver(msg);
-            return true;
-        } else {
-            return false;
-        }
-    } else { // Dispatching is stopped
-        Mutex::ScopedLock locker(messageLock);
-        listeners.addListener(c); // FIXME aconway 2011-05-05:
+    QueuedMessage msg(this);
+    if (getNextMessage(msg, c)) {
+        c->deliver(msg);
+        return true;
+    } else {
         return false;
     }
 }
@@ -450,10 +461,10 @@ void Queue::cancel(Consumer::shared_ptr 
 }
 
 QueuedMessage Queue::get(){
-    ClusterAcquireOnExit willAcquire(broker); // Outside lock
+    ClusterAcquireScope acquireScope(*this); // Outside lock
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
-    if (messages->pop(msg)) willAcquire.qmsg = msg;
+    if (messages->pop(msg)) acquireScope.qmsg = msg;
     return msg;
 }
 
@@ -707,7 +718,9 @@ bool Queue::dequeue(TransactionContext* 
             dequeued(msg);
         }
     }
+
     if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock
+
     // This check prevents messages which have been forced persistent on one queue from dequeuing
     // from another on which no forcing has taken place and thus causing a store error.
     bool fp = msg.payload->isForcedPersistent();
@@ -906,6 +919,10 @@ void Queue::notifyDeleted()
     set.notifyAll();
 }
 
+void Queue::acquireStopped() {
+    if (broker) broker->getCluster().stopped(*this);
+}
+
 void Queue::bound(const string& exchange, const string& key,
                   const FieldTable& args)
 {
@@ -1238,7 +1255,7 @@ bool Queue::bind(boost::shared_ptr<Excha
 }
 
 
-const Broker* Queue::getBroker()
+Broker* Queue::getBroker()
 {
     return broker;
 }
@@ -1276,10 +1293,13 @@ void Queue::UsageBarrier::destroy()
 
 // FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
 void Queue::stop() {
+    // FIXME aconway 2011-05-25: rename dispatching - acquiring?
     dispatching.stop();
 }
 
 void Queue::start() {
+    QPID_LOG(critical, "FIXME start context=" << clusterContext);
+    assert(clusterContext);      // FIXME aconway 2011-06-08: XXX
     dispatching.start();
     notifyListener();
 }

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h Tue Sep  6 21:47:14 2011
@@ -21,6 +21,7 @@
  * under the License.
  *
  */
+#include "qpid/log/Statement.h" // FIXME  XXX aconway 2011-06-08: remove
 
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/OwnershipToken.h"
@@ -131,8 +132,9 @@ class Queue : public boost::enable_share
     UsageBarrier barrier;
     int autoDeleteTimeout;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
-    // Allow dispatching consumer threads to be stopped.
-    sys::Stoppable dispatching;
+    // Allow dispatching consumer threads to be stopped. Used by cluster
+    sys::Stoppable dispatching; // FIXME aconway 2011-06-07: name: acquiring?
+    boost::intrusive_ptr<RefCounted> clusterContext;
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
     void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -180,6 +182,7 @@ class Queue : public boost::enable_share
 
     void checkNotDeleted();
     void notifyDeleted();
+    void acquireStopped();
 
   public:
 
@@ -385,7 +388,7 @@ class Queue : public boost::enable_share
 
     void flush();
 
-    const Broker* getBroker();
+    Broker* getBroker();
 
     uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
     void setDequeueSincePurge(uint32_t value);
@@ -395,13 +398,20 @@ class Queue : public boost::enable_share
      */
     void stop();
 
-    /** Start consumers.
-     *@pre Queue is stopped and idle: no thread in dispatch.
-     */
+    /** Start consumers. */
     void start();
 
-    /** Context data attached and used by cluster code. */
-    boost::intrusive_ptr<qpid::RefCounted> clusterContext;
+    /** Context information used in a cluster. */
+    boost::intrusive_ptr<RefCounted> getClusterContext() {
+        // FIXME aconway 2011-06-08: XXX
+        QPID_LOG(critical, "FIXME q get context " << name << clusterContext);
+        return clusterContext;
+    }
+    void setClusterContext(boost::intrusive_ptr<RefCounted> context) {
+        // FIXME aconway 2011-06-08: XXX
+        clusterContext = context;
+        QPID_LOG(critical, "FIXME q set context " << name << clusterContext);
+    }
 };
 }} // qpid::broker
 

Copied: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp (from r1165884, qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp?p2=qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp&p1=qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp&r1=1165884&r2=1165886&rev=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp Tue Sep  6 21:47:14 2011
@@ -20,17 +20,22 @@
  */
 
 #include "Core.h"
-#include "BrokerHandler.h"
+#include "BrokerContext.h"
+#include "QueueContext.h"
+#include "QueueHandler.h"
 #include "qpid/framing/ClusterMessageRoutingBody.h"
 #include "qpid/framing/ClusterMessageRoutedBody.h"
 #include "qpid/framing/ClusterMessageEnqueueBody.h"
+#include "qpid/framing/ClusterMessageAcquireBody.h"
 #include "qpid/framing/ClusterMessageDequeueBody.h"
+#include "qpid/framing/ClusterMessageReleaseBody.h"
 #include "qpid/framing/ClusterWiringCreateQueueBody.h"
 #include "qpid/framing/ClusterWiringCreateExchangeBody.h"
 #include "qpid/framing/ClusterWiringDestroyQueueBody.h"
 #include "qpid/framing/ClusterWiringDestroyExchangeBody.h"
 #include "qpid/framing/ClusterWiringBindBody.h"
 #include "qpid/framing/ClusterWiringUnbindBody.h"
+#include "qpid/framing/ClusterQueueSubscribeBody.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/broker/Queue.h"
@@ -54,27 +59,28 @@ QPID_TSS bool tssNoReplicate = false;
 QPID_TSS RoutingId tssRoutingId = 0;
 }
 
-BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() {
+BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() {
     assert(!tssNoReplicate);
     tssNoReplicate = true;
 }
 
-BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() {
+BrokerContext::ScopedSuppressReplication::~ScopedSuppressReplication() {
     assert(tssNoReplicate);
     tssNoReplicate = false;
 }
 
-BrokerHandler::BrokerHandler(Core& c) : core(c) {}
+BrokerContext::BrokerContext(Core& c, boost::intrusive_ptr<QueueHandler> q)
+    : core(c), queueHandler(q) {}
 
-RoutingId BrokerHandler::nextRoutingId() {
+RoutingId BrokerContext::nextRoutingId() {
     RoutingId id = ++routingId;
     if (id == 0) id = ++routingId; // Avoid 0 on wrap-around.
     return id;
 }
 
-void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { }
+void BrokerContext::routing(const boost::intrusive_ptr<Message>&) { }
 
-bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
+bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
 {
     if (tssNoReplicate) return true;
     if (!tssRoutingId) {             // This is the first enqueue, so send the message
@@ -93,7 +99,7 @@ bool BrokerHandler::enqueue(Queue& queue
     return false;
 }
 
-void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) {
+void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
     if (tssRoutingId) {             // we enqueued at least one message.
         core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId));
         // Note: routingMap is cleaned up on CPG delivery in MessageHandler.
@@ -101,28 +107,45 @@ void BrokerHandler::routed(const boost::
     }
 }
 
-void BrokerHandler::dequeue(const broker::QueuedMessage& qm) {
+void BrokerContext::acquire(const broker::QueuedMessage& qm) {
     if (tssNoReplicate) return;
-    // FIXME aconway 2010-10-28: we also need to delay completion of the
-    // ack that caused this dequeue until self-delivery of the mcast below.
-    core.mcast(ClusterMessageDequeueBody(
+    QueueContext::get(*qm.queue)->acquire();
+    core.mcast(ClusterMessageAcquireBody(
                    ProtocolVersion(), qm.queue->getName(), qm.position));
 }
 
-void BrokerHandler::create(broker::Queue& q) {
+// FIXME aconway 2011-05-24: need to handle acquire and release.
+// Dequeue in the wrong place?
+void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
     if (tssNoReplicate) return;
+    core.mcast(ClusterMessageDequeueBody(
+                   ProtocolVersion(), qm.queue->getName(), qm.position));
+}
+
+void BrokerContext::release(const broker::QueuedMessage& ) {
+    // FIXME aconway 2011-05-24: TODO
+}
+
+// FIXME aconway 2011-06-08: should be be using shared_ptr to q here?
+void BrokerContext::create(broker::Queue& q) {
+    if (tssNoReplicate) return; // FIXME aconway 2011-06-08: revisit
+    // FIXME aconway 2011-06-08: error handling- if already set...
+    // Create local context immediately, queue will be stopped until replicated.
+    boost::intrusive_ptr<QueueContext> context(
+        new QueueContext(q,core.getMulticaster()));
     std::string data(q.encodedSize(), '\0');
     framing::Buffer buf(&data[0], data.size());
     q.encode(buf);
     core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data));
+    QPID_LOG(critical, "FIXME BrokerContext create " << q.getName() << q.getClusterContext().get());
 }
 
-void BrokerHandler::destroy(broker::Queue& q) {
+void BrokerContext::destroy(broker::Queue& q) {
     if (tssNoReplicate) return;
     core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName()));
 }
 
-void BrokerHandler::create(broker::Exchange& ex) {
+void BrokerContext::create(broker::Exchange& ex) {
     if (tssNoReplicate) return;
     std::string data(ex.encodedSize(), '\0');
     framing::Buffer buf(&data[0], data.size());
@@ -130,12 +153,12 @@ void BrokerHandler::create(broker::Excha
     core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data));
 }
 
-void BrokerHandler::destroy(broker::Exchange& ex) {
+void BrokerContext::destroy(broker::Exchange& ex) {
     if (tssNoReplicate) return;
     core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName()));
 }
 
-void BrokerHandler::bind(broker::Queue& q, broker::Exchange& ex,
+void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex,
                          const std::string& key, const framing::FieldTable& args)
 {
     if (tssNoReplicate) return;
@@ -143,7 +166,7 @@ void BrokerHandler::bind(broker::Queue& 
                    ProtocolVersion(), q.getName(), ex.getName(), key, args));
 }
 
-void BrokerHandler::unbind(broker::Queue& q, broker::Exchange& ex,
+void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex,
                            const std::string& key, const framing::FieldTable& args)
 {
     if (tssNoReplicate) return;
@@ -151,4 +174,32 @@ void BrokerHandler::unbind(broker::Queue
                    ProtocolVersion(), q.getName(), ex.getName(), key, args));
 }
 
+// n is the number of consumers including the one just added.
+// FIXME aconway 2011-06-27: rename, conflicting terms.
+void BrokerContext::consume(broker::Queue& q, size_t n) {
+    if (n == 1) {
+        // FIXME aconway 2011-06-27: should be on QueueContext for symmetry?
+        core.mcast(ClusterQueueSubscribeBody(ProtocolVersion(), q.getName()));
+    }
+}
+
+// n is the number of consumers after the cancel.
+void BrokerContext::cancel(broker::Queue& q, size_t n) {
+    if (n == 0) QueueContext::get(q)->unsubscribed();
+}
+
+void BrokerContext::empty(broker::Queue& ) {
+    // FIXME aconway 2011-06-28: is this needed?
+}
+
+void BrokerContext::stopped(broker::Queue& q) {
+    boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
+    // Don't forward the stopped call if the queue does not yet have a cluster context
+    // this when the queue is first created locally.
+    if (qc){
+        QPID_LOG(critical, "FIXME BrokerContext::stopped " << q.getName());
+        qc->stopped();
+    }
+}
+
 }} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h (from r1165884, qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h?p2=qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h&p1=qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h&r1=1165884&r2=1165886&rev=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h Tue Sep  6 21:47:14 2011
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_BROKERHANDLER_H
-#define QPID_CLUSTER_BROKERHANDLER_H
+#ifndef QPID_CLUSTER_BROKERCONTEXT_H
+#define QPID_CLUSTER_BROKERCONTEXT_H
 
 /*
  *
@@ -28,13 +28,15 @@
 namespace qpid {
 namespace cluster {
 class Core;
+class QueueHandler;
+class QueueContext;
 
 // TODO aconway 2010-10-19: experimental cluster code.
 
 /**
  * Implements broker::Cluster interface, handles events in broker code.
  */
-class BrokerHandler : public broker::Cluster
+class BrokerContext : public broker::Cluster
 {
   public:
     /** Suppress replication while in scope.
@@ -45,7 +47,7 @@ class BrokerHandler : public broker::Clu
         ~ScopedSuppressReplication();
     };
 
-    BrokerHandler(Core&);
+    BrokerContext(Core&, boost::intrusive_ptr<QueueHandler>);
 
     // FIXME aconway 2010-10-20: implement all points.
 
@@ -54,14 +56,18 @@ class BrokerHandler : public broker::Clu
     void routing(const boost::intrusive_ptr<broker::Message>&);
     bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&);
     void routed(const boost::intrusive_ptr<broker::Message>&);
-    void acquire(const broker::QueuedMessage&) {}
-    void release(const broker::QueuedMessage&) {}
+    void acquire(const broker::QueuedMessage&);
     void dequeue(const broker::QueuedMessage&);
+    void release(const broker::QueuedMessage&);
 
     // Consumers
 
-    void consume(broker::Queue&, size_t) {}
-    void cancel(broker::Queue&, size_t) {}
+    void consume(broker::Queue&, size_t);
+    void cancel(broker::Queue&, size_t);
+
+    // Queues
+    void empty(broker::Queue&);
+    void stopped(broker::Queue&);
 
     // Wiring
 
@@ -79,8 +85,9 @@ class BrokerHandler : public broker::Clu
     uint32_t nextRoutingId();
 
     Core& core;
+    boost::intrusive_ptr<QueueHandler> queueHandler;
     sys::AtomicValue<uint32_t> routingId;
 };
 }} // namespace qpid::cluster
 
-#endif  /*!QPID_CLUSTER_BROKERHANDLER_H*/
+#endif  /*!QPID_CLUSTER_BROKERCONTEXT_H*/

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.cpp?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.cpp Tue Sep  6 21:47:14 2011
@@ -21,9 +21,11 @@
 
 #include "Core.h"
 #include "EventHandler.h"
-#include "BrokerHandler.h"
+#include "BrokerContext.h"
 #include "WiringHandler.h"
 #include "MessageHandler.h"
+#include "QueueContext.h"
+#include "QueueHandler.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/SignalHandler.h"
 #include "qpid/framing/AMQFrame.h"
@@ -39,12 +41,17 @@ Core::Core(const Settings& s, broker::Br
     eventHandler(new EventHandler(*this)),
     multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this))
 {
-    eventHandler->add(boost::shared_ptr<HandlerBase>(new WiringHandler(*eventHandler)));
-    eventHandler->add(boost::shared_ptr<HandlerBase>(new MessageHandler(*eventHandler)));
+    boost::intrusive_ptr<QueueHandler> queueHandler(
+        new QueueHandler(*eventHandler, multicaster));
+    eventHandler->add(queueHandler);
+    eventHandler->add(boost::intrusive_ptr<HandlerBase>(
+                          new WiringHandler(*eventHandler, queueHandler)));
+    eventHandler->add(boost::intrusive_ptr<HandlerBase>(
+                          new MessageHandler(*eventHandler)));
 
-    std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this));
+    std::auto_ptr<BrokerContext> bh(new BrokerContext(*this, queueHandler));
     brokerHandler = bh.get();
-    // BrokerHandler belongs to Broker
+    // BrokerContext belongs to Broker
     broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
     eventHandler->start();
     eventHandler->getCpg().join(s.name);
@@ -62,8 +69,7 @@ void Core::fatal() {
 
 void Core::mcast(const framing::AMQBody& body) {
     QPID_LOG(trace, "cluster multicast: " << body);
-    framing::AMQFrame f(body);
-    multicaster.mcast(f);
+    multicaster.mcast(body);
 }
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.h?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.h Tue Sep  6 21:47:14 2011
@@ -44,7 +44,7 @@ class Broker;
 
 namespace cluster {
 class EventHandler;
-class BrokerHandler;
+class BrokerContext;
 
 /**
  * Cluster core state machine.
@@ -77,16 +77,17 @@ class Core
 
     broker::Broker& getBroker() { return broker; }
     EventHandler& getEventHandler() { return *eventHandler; }
-    BrokerHandler& getBrokerHandler() { return *brokerHandler; }
+    BrokerContext& getBrokerContext() { return *brokerHandler; }
+    Multicaster& getMulticaster() { return multicaster; }
 
     /** Map of messages that are currently being routed.
-     * Used to pass messages being routed from BrokerHandler to MessageHandler
+     * Used to pass messages being routed from BrokerContext to MessageHandler
      */
     RoutingMap& getRoutingMap() { return routingMap; }
   private:
     broker::Broker& broker;
     std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
-    BrokerHandler* brokerHandler; // Handles broker events.
+    BrokerContext* brokerHandler; // Handles broker events.
     RoutingMap routingMap;
     Multicaster multicaster;
 };

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp Tue Sep  6 21:47:14 2011
@@ -41,7 +41,7 @@ EventHandler::EventHandler(Core& c) :
 
 EventHandler::~EventHandler() {}
 
-void EventHandler::add(const boost::shared_ptr<HandlerBase>& handler) {
+void EventHandler::add(const boost::intrusive_ptr<HandlerBase>& handler) {
     handlers.push_back(handler);
 }
 

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/EventHandler.h?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/EventHandler.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/EventHandler.h Tue Sep  6 21:47:14 2011
@@ -27,7 +27,7 @@
 #include "qpid/cluster/Cpg.h"
 #include "qpid/cluster/PollerDispatch.h"
 #include "qpid/cluster/types.h"
-#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
 #include <vector>
 
 namespace qpid {
@@ -52,7 +52,7 @@ class EventHandler : public Cpg::Handler
     ~EventHandler();
 
     /** Add a handler */
-    void add(const boost::shared_ptr<HandlerBase>&);
+    void add(const boost::intrusive_ptr<HandlerBase>&);
 
     /** Start polling */
     void start();
@@ -87,7 +87,7 @@ class EventHandler : public Cpg::Handler
     MemberId sender;              // sender of current event.
     MemberId self;
 
-    typedef std::vector<boost::shared_ptr<HandlerBase> > Handlers;
+    typedef std::vector<boost::intrusive_ptr<HandlerBase> > Handlers;
     Handlers handlers;
 };
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h Tue Sep  6 21:47:14 2011
@@ -21,6 +21,7 @@
  * under the License.
  *
  */
+#include "qpid/RefCounted.h"
 #include "qpid/cluster/types.h"
 
 namespace qpid {
@@ -35,7 +36,7 @@ class EventHandler;
 /**
  * Base class for handlers of events, children of the EventHandler.
  */
-class HandlerBase
+class HandlerBase : public RefCounted
 {
   public:
     HandlerBase(EventHandler&);

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/LockedMap.h?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/LockedMap.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/LockedMap.h Tue Sep  6 21:47:14 2011
@@ -54,7 +54,7 @@ class LockedMap
      */
     bool add(const Key& key, const Value& value) {
         sys::RWlock::ScopedWlock w(lock);
-        return map.insert(key, value).second;
+        return map.insert(std::make_pair(key, value)).second;
     }
 
     /** Erase the value associated with key if any. Return true if a value was erased. */

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp Tue Sep  6 21:47:14 2011
@@ -21,7 +21,7 @@
 
 #include "Core.h"
 #include "MessageHandler.h"
-#include "BrokerHandler.h"
+#include "BrokerContext.h"
 #include "EventHandler.h"
 #include "qpid/broker/Message.h"
 #include "qpid/broker/Broker.h"
@@ -73,7 +73,7 @@ void MessageHandler::enqueue(RoutingId r
         msg = memberMap[sender()].routingMap[routingId];
     if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
                                        << " failed:  unknown message"));
-    BrokerHandler::ScopedSuppressReplication ssr;
+    BrokerContext::ScopedSuppressReplication ssr;
     queue->deliver(msg);
 }
 
@@ -84,22 +84,40 @@ void MessageHandler::routed(RoutingId ro
         memberMap[sender()].routingMap.erase(routingId);
 }
 
-void MessageHandler::dequeue(const std::string& q, uint32_t position) {
+void MessageHandler::acquire(const std::string& q, uint32_t position) {
+    // Note acquires from other members. My own acquires were exeuted in
+    // the connection thread
+    if (sender() != self()) {
+        // FIXME aconway 2010-10-28: need to store acquired messages on QueueContext
+        // by broker for possible re-queuing if a broker leaves.
+        boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
+        QueuedMessage qm;
+        BrokerContext::ScopedSuppressReplication ssr;
+        bool ok = queue->acquireMessageAt(position, qm);
+        (void)ok;                   // Avoid unused variable warnings.
+        assert(ok);
+        assert(qm.position.getValue() == position);
+        assert(qm.payload);
+    }
+}
+
+void MessageHandler::dequeue(const std::string& q, uint32_t /*position*/) {
     if (sender() == self()) {
         // FIXME aconway 2010-10-28: we should complete the ack that initiated
-        // the dequeue at this point, see BrokerHandler::dequeue
+        // the dequeue at this point, see BrokerContext::dequeue
         return;
     }
     boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
-    BrokerHandler::ScopedSuppressReplication ssr;
-    QueuedMessage qm;
-    // FIXME aconway 2010-10-28: when we replicate acquires, the acquired
-    // messages will be stored by MessageHandler::acquire.
-    if (queue->acquireMessageAt(position, qm)) {
-        assert(qm.position.getValue() == position);
-        assert(qm.payload);
-        queue->dequeue(0, qm);
-    }
+    BrokerContext::ScopedSuppressReplication ssr;
+    // FIXME aconway 2011-05-12: Remove the acquired message from QueueContext.
+    // Do we need to call this? Review with gsim.
+    // QueuedMessage qm;
+    // Get qm from QueueContext?
+    // queue->dequeue(0, qm);
+}
+
+void MessageHandler::release(const std::string& /*queue*/ , uint32_t /*position*/) {
+    // FIXME aconway 2011-05-24:
 }
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h Tue Sep  6 21:47:14 2011
@@ -39,7 +39,10 @@ class Queue;
 
 namespace cluster {
 class EventHandler;
-class BrokerHandler;
+class BrokerContext;
+
+// FIXME aconway 2011-06-28: doesn't follow the same Handler/Replica/Context pattern as for queue.
+// Make this consistent.
 
 /**
  * Handler for message disposition events.
@@ -55,7 +58,10 @@ class MessageHandler : public framing::A
     void routing(uint32_t routingId, const std::string& message);
     void enqueue(uint32_t routingId, const std::string& queue);
     void routed(uint32_t routingId);
+    void acquire(const std::string& queue, uint32_t position);
     void dequeue(const std::string& queue, uint32_t position);
+    void release(const std::string& queue, uint32_t position);
+
   private:
     struct Member {
         typedef std::map<uint32_t, boost::intrusive_ptr<broker::Message> > RoutingMap;

Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp?rev=1165886&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp Tue Sep  6 21:47:14 2011
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueContext.h"
+#include "Multicaster.h"
+#include "qpid/framing/ClusterQueueResubscribeBody.h"
+#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/log/Statement.h"
+
+
+namespace qpid {
+namespace cluster {
+
+QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
+    : owner(NOT_OWNER), count(0), queue(q), mcast(m)
+{
+    QPID_LOG(debug, "Assign cluster context to queue " << q.getName());
+    q.stop();                   // Initially stopped. Must all before setClusterContext
+    q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
+
+}
+
+// Called by QueueReplica in deliver thread.
+void QueueContext::sharedOwner(size_t limit) {
+    QPID_LOG(critical, "FIXME QueueContext::sharedOwner " << queue.getName() << queue.getClusterContext().get());
+    sys::Mutex::ScopedLock l(lock);
+    count = limit;
+    if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex?
+    owner = SHARED_OWNER;
+}
+
+// Called by QueueReplica in deliver thread.
+void QueueContext::soleOwner() {
+    QPID_LOG(critical, "FIXME QueueContext::soleOwner " << queue.getName() << queue.getClusterContext().get());
+    sys::Mutex::ScopedLock l(lock);
+    count = 0;
+    if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex?
+    owner = SOLE_OWNER;
+}
+
+// Called by BrokerContext in connection thread(s) on acquiring a message
+void QueueContext::acquire() {
+    bool stop = false;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        assert(owner != NOT_OWNER); // Can't acquire on a queue we don't own.
+        QPID_LOG(critical, "FIXME QueueContext::acquire " << queue.getName()
+                 << " owner="  << owner << " count=" << count);
+        if (owner == SHARED_OWNER) {
+            // Note count could be 0 if there are concurrent calls to acquire.
+            if (count && --count == 0) {
+                stop = true;
+            }
+        }
+    }
+    // FIXME aconway 2011-06-28: could have multiple stop() threads...
+    if (stop) queue.stop();
+}
+
+// Callback set up by queue.stop()
+void QueueContext::stopped() {
+    sys::Mutex::ScopedLock l(lock);
+    if (owner == NOT_OWNER) {
+        mcast.mcast(framing::ClusterQueueUnsubscribeBody(
+                        framing::ProtocolVersion(), queue.getName()));
+    } else {
+        owner = NOT_OWNER;
+        mcast.mcast(framing::ClusterQueueResubscribeBody(
+                        framing::ProtocolVersion(), queue.getName()));
+    }
+}
+
+void QueueContext::unsubscribed() {
+    QPID_LOG(critical, "FIXME QueueContext unsubscribed, stopping " << queue.getName());
+    queue.stop();
+    sys::Mutex::ScopedLock l(lock);
+    owner = NOT_OWNER;
+}
+
+boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
+    return boost::intrusive_ptr<QueueContext>(
+        static_cast<QueueContext*>(q.getClusterContext().get()));
+}
+
+
+}} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h?rev=1165886&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h Tue Sep  6 21:47:14 2011
@@ -0,0 +1,93 @@
+#ifndef QPID_CLUSTER_EXP_QUEUESTATE_H
+#define QPID_CLUSTER_EXP_QUEUESTATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <qpid/RefCounted.h>
+#include <qpid/sys/Mutex.h>
+#include <boost/intrusive_ptr.hpp>
+
+
+// FIXME aconway 2011-06-08: refactor broker::Cluster to put queue ups on
+// class broker::Cluster::Queue. This becomes the cluster context.
+
+namespace qpid {
+namespace broker {
+class Queue;
+}
+namespace cluster {
+
+class Multicaster;
+
+ /**
+ * Queue state that is not replicated to the cluster.
+ * Manages the local queue start/stop status
+ *
+ * Thread safe: Called by connection and dispatch threads.
+ */
+class QueueContext : public RefCounted {
+    // FIXME aconway 2011-06-07: consistent use of shared vs. intrusive ptr?
+  public:
+    QueueContext(broker::Queue& q, Multicaster& m);
+
+    /** Sharing ownership of queue, can acquire up to limit before releasing.
+     * Called in deliver thread.
+     */
+    void sharedOwner(size_t limit);
+
+    /** Sole owner of queue, no limits to acquiring */
+    void soleOwner();
+
+    /**
+     * Count an acquired message against the limit.
+     * Called from connection threads while consuming messages
+     */
+    void acquire();
+
+    /** Called if the queue becomes empty, from connection thread. */
+    void empty();
+
+    /** Called when queue is stopped, connection or deliver thread. */
+    void stopped();
+
+    /** Called when the last subscription to a queue is cancelled */
+    void unsubscribed();
+
+    /** Get the context for a broker queue. */
+    static boost::intrusive_ptr<QueueContext> get(broker::Queue&);
+
+  private:
+    void release();
+
+    sys::Mutex lock;
+    enum { NOT_OWNER, SOLE_OWNER, SHARED_OWNER } owner;
+    size_t count;               // Count of dequeues remaining, 0 means no limit.
+    broker::Queue& queue;       // FIXME aconway 2011-06-08: should be shared/weak ptr?
+    Multicaster& mcast;
+
+    // FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing.
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EXP_QUEUESTATE_H*/

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp?rev=1165886&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp Tue Sep  6 21:47:14 2011
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueHandler.h"
+#include "EventHandler.h"
+#include "QueueReplica.h"
+#include "QueueContext.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+// FIXME aconway 2011-05-11: make Multicaster+EventHandler available as Group, clean this up?
+QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m)
+    : HandlerBase(eh), multicaster(m) {}
+
+bool QueueHandler::invoke(const framing::AMQBody& body) {
+    return framing::invoke(*this, body).wasHandled();
+}
+
+void QueueHandler::subscribe(const std::string& queue) {
+    find(queue)->subscribe(sender());
+}
+void QueueHandler::unsubscribe(const std::string& queue) {
+    find(queue)->unsubscribe(sender());
+}
+void QueueHandler::resubscribe(const std::string& queue) {
+    find(queue)->resubscribe(sender());
+}
+
+void QueueHandler::left(const MemberId& member) {
+    // Unsubscribe for members that leave.
+    // FIXME aconway 2011-06-28: also need to re-queue acquired messages.
+    for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i)
+        i->second->unsubscribe(member);
+}
+
+// FIXME aconway 2011-06-08: do we need to hold on to the shared pointer for lifecycle?
+void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
+    // FIXME aconway 2011-06-08: move create operation from Wiring to Queue handler.
+    // FIXME aconway 2011-05-10: assert not already in map.
+
+    // Local queues already have a context, remote queues need one.
+    if (!QueueContext::get(*q))
+        new QueueContext(*q, multicaster); // Context attaches itself to the Queue
+    queues[q->getName()] = boost::intrusive_ptr<QueueReplica>(
+        new QueueReplica(q, self()));
+}
+
+boost::intrusive_ptr<QueueReplica> QueueHandler::find(const std::string& queue) {
+    QueueMap::iterator i = queues.find(queue);
+    if (i == queues.end())
+        throw Exception(QPID_MSG("Unknown queue " << queue << " in cluster queue handler"));
+    return i->second;
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h?rev=1165886&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h Tue Sep  6 21:47:14 2011
@@ -0,0 +1,82 @@
+#ifndef QPID_CLUSTER_QUEUEHANDLER_H
+#define QPID_CLUSTER_QUEUEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "HandlerBase.h"
+#include "LockedMap.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "boost/shared_ptr.hpp"
+#include "boost/intrusive_ptr.hpp"
+#include <map>
+
+namespace qpid {
+
+namespace broker {
+class Queue;
+class QueuedMessage;
+}
+
+namespace cluster {
+
+class EventHandler;
+class QueueReplica;
+class Multicaster;
+
+/**
+ * Handler for queue subscription events.
+ *
+ * THREAD UNSAFE: only accessed in cluster deliver thread, on delivery
+ * of queue controls and also from WiringHandler on delivery of queue
+ * create.
+ */
+class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler,
+                     public HandlerBase
+{
+  public:
+    QueueHandler(EventHandler&, Multicaster&);
+
+    bool invoke(const framing::AMQBody& body);
+
+    // Events
+    void subscribe(const std::string& queue);
+    void unsubscribe(const std::string& queue);
+    void resubscribe(const std::string& queue);
+    void left(const MemberId&);
+
+    void add(boost::shared_ptr<broker::Queue>);
+
+    // NB: These functions ar called in connection threads, not deliver threads.
+    void acquired(const broker::QueuedMessage& qm);
+    void empty(const broker::Queue& q);
+
+  private:
+    typedef std::map<std::string, boost::intrusive_ptr<QueueReplica> > QueueMap;
+
+    boost::intrusive_ptr<QueueReplica> find(const std::string& queue);
+
+    QueueMap queues;
+    Multicaster& multicaster;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_QUEUEHANDLER_H*/

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp?rev=1165886&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp Tue Sep  6 21:47:14 2011
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueReplica.h"
+#include "QueueContext.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/log/Statement.h"
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+
+QueueReplica::QueueReplica(boost::shared_ptr<broker::Queue> q,
+                           const MemberId& self_)
+    : queue(q), self(self_), context(QueueContext::get(*q))
+{
+    // q is initially stopped.
+}
+
+struct PrintSubscribers {
+    const QueueReplica::MemberQueue& mq;
+    PrintSubscribers(const QueueReplica::MemberQueue& m) : mq(m) {}
+};
+
+std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) {
+    copy(ps.mq.begin(), ps.mq.end(), std::ostream_iterator<MemberId>(o, " "));
+    return o;
+}
+
+std::ostream& operator<<(std::ostream& o, QueueReplica::State s) {
+    static char* tags[] = { "UNSUBSCRIBED", "SUBSCRIBED", "SOLE_OWNER", "SHARED_OWNER" };
+    return o << tags[s];
+}
+
+std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) {
+    o << qr.queue->getName() << "(" << qr.getState() << "): "
+      <<  PrintSubscribers(qr.subscribers);
+    return o;
+}
+
+// FIXME aconway 2011-05-17: error handling for asserts.
+
+void QueueReplica::subscribe(const MemberId& member) {
+    State before = getState();
+    subscribers.push_back(member);
+    update(before);
+}
+
+void QueueReplica::unsubscribe(const MemberId& member) {
+    State before = getState();
+    MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
+    if (i != subscribers.end()) {
+        subscribers.erase(i, subscribers.end());
+        update(before);
+    }
+}
+
+void QueueReplica::resubscribe(const MemberId& member) {
+    assert (member == subscribers.front()); // FIXME aconway 2011-06-27: error handling
+    State before = getState();
+    subscribers.pop_front();
+    subscribers.push_back(member);
+    update(before);
+}
+
+void QueueReplica::update(State before) {
+    const int acquireLimit = 10; // FIXME aconway 2011-06-23: configurable
+    State after = getState();
+    if (before == after) return;
+    QPID_LOG(trace, "QueueReplica " << *this << " (was " << before << ")");
+    switch (after) {
+      case UNSUBSCRIBED: break;
+      case SUBSCRIBED: break;
+      case SOLE_OWNER:
+        context->soleOwner();
+        break;
+      case SHARED_OWNER:
+        context->sharedOwner(acquireLimit);
+        break;
+    }
+}
+
+QueueReplica::State QueueReplica::getState() const {
+    if (isOwner())
+        return (subscribers.size() > 1) ? SHARED_OWNER : SOLE_OWNER;
+    return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED;
+}
+
+bool QueueReplica::isOwner() const {
+    return !subscribers.empty() && subscribers.front() == self;
+}
+
+bool QueueReplica::isSubscriber(const MemberId& member) const {
+    // FIXME aconway 2011-06-27: linear search here, is it a performance issue?
+    return std::find(subscribers.begin(), subscribers.end(), member) != subscribers.end();
+}
+
+}} // namespace qpid::cluster::exp

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h?rev=1165886&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h Tue Sep  6 21:47:14 2011
@@ -0,0 +1,85 @@
+#ifndef QPID_CLUSTER_QUEUEMODEL_H
+#define QPID_CLUSTER_QUEUEMODEL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qpid/cluster/types.h"
+#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+#include <deque>
+
+namespace qpid {
+
+namespace broker {
+class Queue;
+}
+
+namespace cluster {
+class QueueHandler;
+class QueueContext;
+
+/**
+ * Queue state that is replicated among all cluster members.
+ *
+ * Handles queue subscription controls by starting/stopping the queue.
+ *
+ * THREAD UNSAFE: only used in cluster deliver thread, on delivery
+ * of queue controls and also from WiringHandler on delivery of queue
+ * create.
+ */
+class QueueReplica : public RefCounted
+{
+  public:
+    QueueReplica(boost::shared_ptr<broker::Queue> , const MemberId& );
+    void subscribe(const MemberId&);
+    void unsubscribe(const MemberId&);
+    void resubscribe(const MemberId&);
+
+  private:
+    enum State {
+        UNSUBSCRIBED,
+        SUBSCRIBED,
+        SOLE_OWNER,
+        SHARED_OWNER
+    };
+
+  friend class PrintSubscribers;
+  friend std::ostream& operator<<(std::ostream&, State);
+  friend std::ostream& operator<<(std::ostream&, const QueueReplica&);
+
+    typedef std::deque<MemberId> MemberQueue;
+
+    boost::shared_ptr<broker::Queue> queue;
+    MemberQueue subscribers;
+    MemberId self;
+    boost::intrusive_ptr<QueueContext> context;
+
+    State getState() const;
+    bool isOwner() const;
+    bool isSubscriber(const MemberId&) const;
+    void update(State before);
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_QUEUEMODEL_H*/

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp Tue Sep  6 21:47:14 2011
@@ -22,7 +22,8 @@
 #include "Core.h"
 #include "WiringHandler.h"
 #include "EventHandler.h"
-#include "BrokerHandler.h"
+#include "QueueHandler.h"
+#include "BrokerContext.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/Queue.h"
@@ -32,18 +33,20 @@
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
-#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
 namespace cluster {
 using namespace broker;
 using framing::FieldTable;
 
-WiringHandler::WiringHandler(EventHandler& e) :
+WiringHandler::WiringHandler(EventHandler& e,
+                             const boost::intrusive_ptr<QueueHandler>& qh) :
     HandlerBase(e),
     broker(e.getCore().getBroker()),
     recovery(broker.getQueues(), broker.getExchanges(),
-             broker.getLinks(), broker.getDtxManager())
+             broker.getLinks(), broker.getDtxManager()),
+    queueHandler(qh)
 {}
 
 bool WiringHandler::invoke(const framing::AMQBody& body) {
@@ -51,24 +54,39 @@ bool WiringHandler::invoke(const framing
 }
 
 void WiringHandler::createQueue(const std::string& data) {
-    if (sender() == self()) return;
-    BrokerHandler::ScopedSuppressReplication ssr;
-    framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
-    // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
-    RecoverableQueue::shared_ptr queue = recovery.recoverQueue(buf);
-    QPID_LOG(debug, "cluster: create queue " << queue->getName());
+    // FIXME aconway 2011-05-25: Needs async completion.
+    std::string name;
+    if (sender() != self()) {   // Created by another member, need to create locally.
+        BrokerContext::ScopedSuppressReplication ssr;
+        framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
+        // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
+        RecoverableQueue::shared_ptr rq = recovery.recoverQueue(buf);
+        name = rq->getName();
+    }
+    else {                      // Created locally, Queue and QueueContext already exist.
+        framing::Buffer buffer(const_cast<char*>(&data[0]), data.size());
+        // FIXME aconway 2011-05-10: implicit knowledge of queue encoding.
+        buffer.getShortString(name);
+    }
+    boost::shared_ptr<broker::Queue> q = broker.getQueues().find(name);
+    assert(q);                  // FIXME aconway 2011-05-10: error handling.
+    // TODO aconway 2011-05-10: if we implement multi-group for queues then
+    // this call is a problem: comes from wiring delivery thread, not queues.
+    // FIXME aconway 2011-06-08: move wiring ops to Queue and Exchange handlers..
+    queueHandler->add(q);
+    QPID_LOG(debug, "cluster: create queue " << q->getName());
 }
 
 void WiringHandler::destroyQueue(const std::string& name) {
     if (sender() == self()) return;
     QPID_LOG(debug, "cluster: destroy queue " << name);
-    BrokerHandler::ScopedSuppressReplication ssr;
+    BrokerContext::ScopedSuppressReplication ssr;
     broker.deleteQueue(name, std::string(), std::string());
 }
 
 void WiringHandler::createExchange(const std::string& data) {
     if (sender() == self()) return;
-    BrokerHandler::ScopedSuppressReplication ssr;
+    BrokerContext::ScopedSuppressReplication ssr;
     framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
     // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
     RecoverableExchange::shared_ptr exchange = recovery.recoverExchange(buf);
@@ -78,7 +96,7 @@ void WiringHandler::createExchange(const
 void WiringHandler::destroyExchange(const std::string& name) {
     if (sender() == self()) return;
     QPID_LOG(debug, "cluster: destroy exchange " << name);
-    BrokerHandler::ScopedSuppressReplication ssr;
+    BrokerContext::ScopedSuppressReplication ssr;
     broker.getExchanges().destroy(name);
 }
 
@@ -91,7 +109,7 @@ void WiringHandler::bind(
              << " exchange=" << exchangeName
              << " key=" << routingKey
              << " arguments=" << arguments);
-    BrokerHandler::ScopedSuppressReplication ssr;
+    BrokerContext::ScopedSuppressReplication ssr;
     broker.bind(queueName, exchangeName, routingKey, arguments, std::string(), std::string());
 }
 
@@ -104,7 +122,7 @@ void WiringHandler::unbind(
              << " exchange=" << exchangeName
              << " key=" << routingKey
              << " arguments=" << arguments);
-    BrokerHandler::ScopedSuppressReplication ssr;
+    BrokerContext::ScopedSuppressReplication ssr;
     broker.unbind(queueName, exchangeName, routingKey, std::string(), std::string());
 }
 

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h Tue Sep  6 21:47:14 2011
@@ -42,7 +42,7 @@ class Broker;
 
 namespace cluster {
 class EventHandler;
-
+class QueueHandler;
 
 /**
  * Handler for wiring disposition events.
@@ -51,7 +51,7 @@ class WiringHandler : public framing::AM
                       public HandlerBase
 {
   public:
-    WiringHandler(EventHandler&);
+    WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh);
 
     bool invoke(const framing::AMQBody& body);
 
@@ -66,8 +66,10 @@ class WiringHandler : public framing::AM
 
 
   private:
+
     broker::Broker& broker;
     broker::RecoveryManagerImpl recovery;
+    boost::intrusive_ptr<QueueHandler> queueHandler;
 };
 
 }} // namespace qpid::cluster

Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/overview.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/overview.h?rev=1165886&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/overview.h (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/overview.h Tue Sep  6 21:47:14 2011
@@ -0,0 +1,13 @@
+// This file is documentation in doxygen format.
+/**
+
+<h1>New cluster implementation overview</h>
+
+There are 3 areas indicated by a suffix on class names:
+
+- Replica: State that is replicated to the entire cluster. Only called by Handlers in the deliver thread.
+- Context: State that is private to this member. Called by both Replia and broker objects in deliver and connection threads.
+- Handler: Dispatch CPG messages by calling Replica objects in the deliver thread.
+
+
+**/

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/overview.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/overview.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h Tue Sep  6 21:47:14 2011
@@ -21,17 +21,27 @@
  * under the License.
  *
  */
+
+#include <boost/function.hpp>
+
 namespace qpid {
 namespace sys {
 
+// FIXME aconway 2011-05-25: needs better name
+
 /**
  * An activity that may be executed by multiple threads, and can be stopped.
- * Stopping prevents new threads from entering and waits till exiting busy threads leave.
+ *
+ * Stopping prevents new threads from entering and calls a callback
+ * when all busy threads leave.
  */
 class Stoppable {
   public:
-    Stoppable() : busy(0), stopped(false) {}
-    ~Stoppable() { stop(); }
+    /**
+     *@param stoppedCallback: called when all threads have stopped.
+     */
+    Stoppable(boost::function<void()> stoppedCallback)
+        : busy(0), stopped(false), notify(stoppedCallback) {}
 
     /** Mark the scope of a busy thread like this:
      * <pre>
@@ -52,38 +62,49 @@ class Stoppable {
 
   friend class Scope;
 
-    /** Mark  stopped, wait for all threads to leave their busy scope. */
+    /**
+     * Set state to "stopped", so no new threads can enter.
+     * Call notify function when all busy threads have left.
+     */
+    // FIXME aconway 2011-06-27: not guaranteed that stopped will be called,
+    // deadlock?
     void stop() {
         sys::Monitor::ScopedLock l(lock);
         stopped = true;
-        while (busy > 0) lock.wait();
+        check();
     }
 
-    /** Set the state to started.
-     *@pre state is stopped and no theads are busy.
+    /** Set the state to "started", allow threads to enter.
      */
     void start() {
         sys::Monitor::ScopedLock l(lock);
-        assert(stopped && busy == 0); // FIXME aconway 2011-05-06: error handling.
         stopped = false;
     }
 
-  private:
-    uint busy;
-    bool stopped;
-    sys::Monitor lock;
-
+    // Busy thread enters scope
     bool enter() {
         sys::Monitor::ScopedLock l(lock);
         if (!stopped) ++busy;
         return !stopped;
     }
 
+    // Busy thread exits scope
     void exit() {
         sys::Monitor::ScopedLock l(lock);
         assert(busy > 0);
-        if (--busy == 0) lock.notifyAll();
+        --busy;
+        check();
+    }
+
+  private:
+    void check() {
+        if (stopped && busy == 0 && notify) notify();
     }
+
+    uint busy;
+    bool stopped;
+    sys::Monitor lock;
+    boost::function< void() > notify;
 };
 
 }} // namespace qpid::sys

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/BrokerClusterCalls.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/BrokerClusterCalls.cpp?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/BrokerClusterCalls.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/BrokerClusterCalls.cpp Tue Sep  6 21:47:14 2011
@@ -56,13 +56,18 @@ class DummyCluster : public broker::Clus
      */
     bool isRouting;
 
+    // Record a QueuedMessage
     void recordQm(const string& op, const broker::QueuedMessage& qm) {
         history += (format("%s(%s, %d, %s)") % op % qm.queue->getName()
                     % qm.position % qm.payload->getFrames().getContent()).str();
     }
+
+    // Record a message
     void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) {
         history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str();
     }
+
+    // Record a string
     void recordStr(const string& op, const string& name) {
         history += (format("%s(%s)") % op % name).str();
     }
@@ -102,6 +107,11 @@ class DummyCluster : public broker::Clus
         history += (format("cancel(%s, %d)") % q.getName() % n).str();
     }
 
+    // Queues
+    // FIXME aconway 2011-05-18: update test to exercise empty()
+    virtual void empty(broker::Queue& q) { recordStr("empty", q.getName()); }
+    virtual void stopped(broker::Queue& q) { recordStr("stopped", q.getName()); }
+
     // Wiring
 
     virtual void create(broker::Queue& q) { recordStr("createq", q.getName()); }
@@ -230,7 +240,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
     h.clear();
     i = 0;
     m = Message("t");
-    m.setTtl(Duration(1));                // Timeout 1ms
+    m.setTtl(Duration(1));      // Timeout 1ms
     sender.send(m);
     usleep(2000);               // Sleep 2ms
     bool received = receiver.fetch(m, Duration::IMMEDIATE);
@@ -239,6 +249,10 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
     BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
     BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
+    // Note: empty is called once for each receiver.
+    BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
+    BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
+    BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
     BOOST_CHECK_EQUAL(h.size(), i);
 
     // Message replaced on LVQ

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/cluster2_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/cluster2_tests.py?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/cluster2_tests.py (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/cluster2_tests.py Tue Sep  6 21:47:14 2011
@@ -33,8 +33,27 @@ log = getLogger("qpid.cluster_tests")
 class Cluster2Tests(BrokerTest):
     """Tests for new cluster code."""
 
-    def verify_content(self, content, receiver):
-        for c in content: self.assertEqual(c, receiver.fetch(1).content)
+    def queue_exists(self, queue, connection):
+        s = connection.session()
+        try:
+            s.sender(queue)
+            return True
+        except qpid.messaging.exceptions.NotFound:
+            return False
+
+    # FIXME aconway 2011-06-22: needed to compensate for
+    # async wiring in early cluster2 prototype
+    def wait_for_queue(self, queue, connections, timeout=10):
+        deadline = time.time() + timeout
+        for c in connections:
+            while not self.queue_exists(queue,c):
+                if time.time() >  timeout: fail("Time out in wait_for_queue(%s))"%queue)
+                time.sleep(0.01)
+
+    # FIXME aconway 2011-05-17: remove, use assert_browse.
+    def verify_content(self, expect, receiver):
+        actual = [receiver.fetch(1).content for x in expect]
+        self.assertEqual(expect, actual)
         self.assertRaises(Empty, receiver.fetch, 0)
 
     def test_message_enqueue(self):
@@ -74,12 +93,15 @@ class Cluster2Tests(BrokerTest):
         s0 = sn0.sender("q;{create:always,delete:always}")
         r0 = sn0.receiver("q")
         sn1 = cluster[1].connect().session()
-        r1 = sn1.receiver("q;{create:always}") # Not yet replicating wiring.
+        r1 = sn1.receiver("q;{create:always}")
 
         content = ["a","b","c"]
         for m in content: s0.send(Message(m))
-         # Verify enqueued on cluster[1]
+        # Verify enqueued on members 0 and 1
+        # FIXME aconway 2011-05-13:
+        self.verify_content(content, sn0.receiver("q;{mode:browse}"))
         self.verify_content(content, sn1.receiver("q;{mode:browse}"))
+
         # Dequeue on cluster[0]
         self.assertEqual(r0.fetch(1).content, "a")
         sn0.acknowledge(sync=True)
@@ -114,3 +136,40 @@ class Cluster2Tests(BrokerTest):
         self.assertRaises(NotFound, cluster[1].connect().session().receiver, "ex")
 
         # FIXME aconway 2010-10-29: test unbind, may need to use old API.
+
+    def test_dequeue_mutex(self):
+        """Ensure that one and only one consumer receives each dequeued message."""
+        class Receiver(Thread):
+            def __init__(self, session):
+                self.session = session
+                self.receiver = session.receiver("q")
+                self.messages = []
+                Thread.__init__(self)
+
+            def run(self):
+                try:
+                    while True:
+                        self.messages.append(self.receiver.fetch(1))
+                        self.session.acknowledge()
+                except Empty: pass
+
+        cluster = self.cluster(3, cluster2=True, args=["-t"]) # FIXME aconway 2011-05-13: -t
+        connections = [ b.connect() for  b in cluster]
+        sessions = [ c.session() for c in connections ]
+        sender = sessions[0].sender("q;{create:always}")
+        self.wait_for_queue("q", connections)
+
+        receivers = [ Receiver(s) for s in sessions ]
+        for r in receivers: r.start()
+
+        n = 0
+        t = time.time() + 1             # Send for 1 second.
+        while time.time() < t:
+            sender.send(str(n))
+            n += 1
+        for r in receivers: r.join();
+        print "FIXME", [len(r.messages) for r in receivers] # FIXME aconway 2011-05-17:
+        for r in receivers: assert len(r.messages) # At least one message to each
+        messages = [int(m.content) for r in receivers for m in r.messages ]
+        messages.sort()
+        self.assertEqual(range(n), messages)

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-test-cluster
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-test-cluster?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-test-cluster (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-test-cluster Tue Sep  6 21:47:14 2011
@@ -28,7 +28,7 @@ Options:
              Default is $DEFAULT_ENV.
  -c CONFIG   Use CONFIG as qpidd config file. Copies CONFIG to each host.
              Default is $DEFAULT_CONF
- -d          Delete data-dir and log file before starting broker.	     
+ -d          Delete data-dir and log file before starting broker.
 "
     exit 1
 }
@@ -82,6 +82,7 @@ do_start() {
 }
 
 do_stop() {
+
     for h in $HOSTS; do
 	ssh $h "$SOURCE_ENV qpidd -q --no-module-dir --no-data-dir $QPIDD_ARGS"
     done

Modified: qpid/branches/qpid-2920-1/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/xml/cluster.xml?rev=1165886&r1=1165885&r2=1165886&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/xml/cluster.xml (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/xml/cluster.xml Tue Sep  6 21:47:14 2011
@@ -78,6 +78,10 @@
       <field name="left" type="vbin16"/> <!-- packed member-id array -->
     </control>
 
+    <control name="message-expired" code="0x12">
+      <field name="id" type="uint64"/>
+    </control>
+
     <domain name="error-type" type="uint8" label="Types of error">
       <enum>
 	<choice name="none" value="0"/>
@@ -337,6 +341,7 @@
       <field name="message" type="str32"/>
     </control>
 
+    <!--  FIXME aconway 2011-04-27: reference queues by index, not name -->
     <control name="enqueue" code="0x2">
       <field name="routing-id" type="uint32"/>
       <field name="queue" type="queue.name"/>
@@ -346,10 +351,22 @@
       <field name="routing-id" type="uint32"/>
     </control>
 
-    <control name="dequeue" code="0x4">
+    <!-- FIXME aconway 2011-04-27: review queue positions vs. global message IDs -->
+    <control name="acquire" code="0x4">
+      <field name="queue" type="queue.name"/>
+      <field name="position" type="uint32"/>
+    </control>
+
+    <control name="dequeue" code="0x5">
       <field name="queue" type="queue.name"/>
       <field name="position" type="uint32"/>
     </control>
+
+    <control name="release" code="0x6">
+      <field name="queue" type="queue.name"/>
+      <field name="position" type="uint32"/>
+    </control>
+
   </class>
 
   <class name="cluster-wiring" code="0x83">
@@ -384,4 +401,26 @@
     </control>
 
   </class>
+
+  <!-- Manage subscriptions to a queue.
+
+       Each queue has a "subscriber queue" of members waiting take
+       messages from the queue.  The member at the front of the queue
+       is the only one allowed to take messages. -->
+
+  <class name="cluster-queue" code="0x84">
+    <!-- Join at the back of the subscriber queue -->
+    <control name="subscribe" code="0x1">
+      <field name="queue" type="queue.name"/>
+    </control>
+    <!-- Leave the subscriber queue -->
+    <control name="unsubscribe" code="0x2">
+      <field name="queue" type="queue.name"/>
+    </control>
+    <!-- Move the member at the front to the back. -->
+    <control name="resubscribe" code="0x3">
+      <field name="queue" type="queue.name"/>
+    </control>
+  </class>
+
 </amqp>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message