qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1197748 - in /qpid/branches/qpid-2920-active/qpid/cpp/src: ./ qpid/cluster/exp/
Date Fri, 04 Nov 2011 20:26:52 GMT
Author: aconway
Date: Fri Nov  4 20:26:52 2011
New Revision: 1197748

URL: http://svn.apache.org/viewvc?rev=1197748&view=rev
Log:
QPID-2920: Introduced "ticks" for timing.

- Use fewer timer tasks - 1 per group
- Dispatch timer tasks in separate thread context per group.

Added:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp   (with props)
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.h   (with props)
Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk Fri Nov  4 20:26:52 2011
@@ -115,23 +115,21 @@ cluster2_la_SOURCES =				\
 	qpid/cluster/exp/BrokerContext.h	\
 	qpid/cluster/exp/BufferFactory.h	\
 	qpid/cluster/exp/Cluster2Plugin.cpp	\
-	qpid/cluster/exp/CountdownTimer.h	\
 	qpid/cluster/exp/Core.cpp		\
 	qpid/cluster/exp/Core.h			\
+	qpid/cluster/exp/CountdownTimer.h	\
 	qpid/cluster/exp/EventHandler.cpp	\
 	qpid/cluster/exp/EventHandler.h		\
 	qpid/cluster/exp/Group.cpp		\
 	qpid/cluster/exp/Group.h		\
-	qpid/cluster/exp/hash.cpp		\
-	qpid/cluster/exp/hash.h			\
 	qpid/cluster/exp/HandlerBase.cpp	\
 	qpid/cluster/exp/HandlerBase.h		\
 	qpid/cluster/exp/MessageBuilders.cpp	\
 	qpid/cluster/exp/MessageBuilders.h	\
-	qpid/cluster/exp/MessageHolder.cpp	\
-	qpid/cluster/exp/MessageHolder.h	\
 	qpid/cluster/exp/MessageHandler.cpp	\
 	qpid/cluster/exp/MessageHandler.h	\
+	qpid/cluster/exp/MessageHolder.cpp	\
+	qpid/cluster/exp/MessageHolder.h	\
 	qpid/cluster/exp/Multicaster.cpp	\
 	qpid/cluster/exp/Multicaster.h		\
 	qpid/cluster/exp/QueueContext.cpp	\
@@ -142,9 +140,13 @@ cluster2_la_SOURCES =				\
 	qpid/cluster/exp/QueueReplica.h		\
 	qpid/cluster/exp/Settings.cpp		\
 	qpid/cluster/exp/Settings.h		\
+	qpid/cluster/exp/Ticker.h		\
+	qpid/cluster/exp/Ticker.cpp		\
 	qpid/cluster/exp/UniqueIds.h		\
 	qpid/cluster/exp/WiringHandler.cpp	\
-	qpid/cluster/exp/WiringHandler.h
+	qpid/cluster/exp/WiringHandler.h	\
+	qpid/cluster/exp/hash.cpp		\
+	qpid/cluster/exp/hash.h
 
 
 # The watchdog plugin and helper executable

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp Fri Nov
 4 20:26:52 2011
@@ -19,11 +19,11 @@
  *
  */
 
-#include "Core.h"
 #include "BrokerContext.h"
-#include "QueueContext.h"
-#include "Multicaster.h"
+#include "Core.h"
 #include "MessageHolder.h"
+#include "Multicaster.h"
+#include "QueueContext.h"
 #include "hash.h"
 #include "qpid/framing/ClusterMessageEnqueueBody.h"
 #include "qpid/framing/ClusterMessageAcquireBody.h"
@@ -136,7 +136,7 @@ void BrokerContext::create(broker::Queue
     if (!tssReplicate) return;
     assert(!QueueContext::get(q));
     boost::intrusive_ptr<QueueContext> context(
-        new QueueContext(q, core.getSettings().getConsumeLock(), mcaster(q.getName())));
+        new QueueContext(q, core.getGroup(q.getName()), core.getSettings().consumeTicks));
     std::string data(q.encodedSize(), '\0');
     framing::Buffer buf(&data[0], data.size());
     q.encode(buf);

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h Fri Nov 
4 20:26:52 2011
@@ -29,6 +29,7 @@ namespace qpid {
 namespace cluster {
 class Core;
 class QueueContext;
+class Multicaster;
 
 // TODO aconway 2010-10-19: experimental cluster code.
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp Fri Nov
 4 20:26:52 2011
@@ -35,8 +35,9 @@ struct Cluster2Plugin : public Plugin {
         Opts(Settings& s) : Options("Cluster Options"), settings(s) {
             addOptions()
                 ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join")
-                ("cluster2-consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum
time a broker can hold the consume lock on a shared queue, in microseconds.")
-                ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent
streams of processing for multicast/deliver.");
+                ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent
streams of processing for multicast/deliver.")
+                ("cluster2-tick", optValue(settings.tick, "uS"), "Length of 'tick' used for
timing events in the cluster.")
+                ("cluster2-consume-ticks", optValue(settings.consumeTicks, "N"), "Maximum
number of ticks a broker can hold the consume lock on a shared queue.");
                 // FIXME aconway 2011-10-05: add all relevant options from ClusterPlugin.h.
                 // FIXME aconway 2011-10-05: rename to final option names.
         }

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp Fri Nov  4 20:26:52
2011
@@ -19,13 +19,14 @@
  *
  */
 
+#include "BrokerContext.h"
 #include "Core.h"
 #include "EventHandler.h"
-#include "BrokerContext.h"
-#include "WiringHandler.h"
 #include "MessageHandler.h"
 #include "QueueContext.h"
 #include "QueueHandler.h"
+#include "WiringHandler.h"
+#include "hash.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/SignalHandler.h"
 #include "qpid/framing/AMQFrame.h"
@@ -47,7 +48,7 @@ Core::Core(const Settings& s, broker::Br
         std::string groupName = s.name + "-" + boost::lexical_cast<std::string>(i);
         groups.push_back(new Group(*this));
         boost::intrusive_ptr<Group> group(groups.back());
- 
+
         EventHandler& eh(group->getEventHandler());
         typedef boost::intrusive_ptr<HandlerBase>  HandlerBasePtr;
         boost::intrusive_ptr<QueueHandler> queueHandler(new QueueHandler(*group, settings));
@@ -66,8 +67,6 @@ Core::Core(const Settings& s, broker::Br
     }
     QPID_LOG(notice, "cluster: joined cluster " << s.name
              << ", member-id="<< groups[0]->getEventHandler().getSelf());
-    QPID_LOG(debug, "cluster: consume-lock=" << s.consumeLockMicros << "us "
-             << " concurrency=" << s.concurrency);
 }
 
 void Core::initialize() {}
@@ -80,4 +79,8 @@ Group& Core::getGroup(size_t hashValue) 
     return *groups[hashValue % groups.size()];
 }
 
+Group& Core::getGroup(const std::string& q) {
+    return getGroup(hashof(q));
+}
+
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h Fri Nov  4 20:26:52
2011
@@ -76,8 +76,8 @@ class Core
 
     const Settings& getSettings() const { return settings; }
 
-    /** Get group by hash value. */
     Group& getGroup(size_t hashValue);
+    Group& getGroup(const std::string& queueName);
 
   private:
     broker::Broker& broker;

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp Fri Nov  4 20:26:52
2011
@@ -18,13 +18,13 @@
  * under the License.
  *
  */
-#include "Group.h"
 #include "Core.h"
 #include "EventHandler.h"
-#include "Multicaster.h"
-#include "MessageHolder.h"
+#include "Group.h"
 #include "MessageBuilders.h"
-
+#include "MessageHolder.h"
+#include "Multicaster.h"
+#include "Ticker.h"
 #include "qpid/broker/Broker.h"
 
 namespace qpid {
@@ -44,7 +44,10 @@ Group::Group(Core& core) :
                         core.getBroker().getPoller(),
                         boost::bind(&Core::fatal, &core))),
     messageHolder(new MessageHolder()),
-    messageBuilders(new MessageBuilders(&core.getBroker().getStore()))
+    messageBuilders(new MessageBuilders(&core.getBroker().getStore())),
+    ticker(new Ticker(core.getSettings().getTick(),
+                      core.getBroker().getTimer(),
+                      core.getBroker().getPoller()))
 {}
 
 Group::~Group() {}

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h Fri Nov  4 20:26:52
2011
@@ -39,10 +39,12 @@ class EventHandler;
 class Multicaster;
 class MessageBuilders;
 class MessageHolder;
+class Ticker;
 
 /**
- * A CPG instance with an event handler and a multi-caster, 
- * along with all the per-group handler objects.
+ * Resources used by a group of queues. Includes a CPG instance with
+ * an event handler and a multi-caster, along with all the per-group
+ * handler objects and a Ticker.
  */
 class Group : public RefCounted
 {
@@ -54,6 +56,7 @@ class Group : public RefCounted
     Multicaster& getMulticaster() { return *multicaster; }
     MessageHolder& getMessageHolder() { return *messageHolder; }
     MessageBuilders& getMessageBuilders() { return *messageBuilders; }
+    Ticker& getTicker() { return *ticker; }
 
     void mcast(const framing::AMQBody&);
     void mcast(const framing::AMQFrame&);
@@ -62,6 +65,7 @@ class Group : public RefCounted
     std::auto_ptr<Multicaster> multicaster;
     std::auto_ptr<MessageHolder> messageHolder;
     std::auto_ptr<MessageBuilders> messageBuilders;
+    std::auto_ptr<Ticker> ticker;
 };
 
 }} // namespace qpid::cluster::exp

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp Fri Nov
 4 20:26:52 2011
@@ -20,11 +20,12 @@
  *
  */
 
-#include "QueueContext.h"
+#include "BrokerContext.h"
+#include "Group.h"
 #include "Multicaster.h"
-#include "qpid/cluster/types.h"
-#include "BrokerContext.h"      // for ScopedSuppressReplication
+#include "QueueContext.h"
 #include "hash.h"
+#include "qpid/cluster/types.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/framing/ClusterQueueResubscribeBody.h"
 #include "qpid/framing/ClusterQueueSubscribeBody.h"
@@ -37,14 +38,14 @@
 namespace qpid {
 namespace cluster {
 
-QueueContext::QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster&
m)
-    : timer(boost::bind(&QueueContext::timeout, this),
-            q.getBroker()->getTimer(),
-            consumeLock),
-      queue(q), mcast(m), consumers(0), hash(hashof(q.getName()))
+QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_)
+    : consumers(0), consuming(true), ticks(0),
+      queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())),
+      maxTicks(maxTicks_)
 {
     q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
     q.stopConsumers();          // Stop queue initially.
+    g.getTicker().add(this);
 }
 
 QueueContext::~QueueContext() {}
@@ -54,72 +55,74 @@ bool isOwner(QueueOwnership o) { return 
 }
 
 // Called by QueueReplica in CPG deliver thread when state changes.
-void QueueContext::replicaState(
-    QueueOwnership before, QueueOwnership after, bool selfDelivered)
+void QueueContext::replicaState(QueueOwnership before, QueueOwnership after)
 {
-    // No lock, this function does not touch any member variables.
-
-    // Invariants for ownership:
-    // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped
-    // SOLE_OWNER <=> timer stopped, queue started
-    // SHARED_OWNER <=> timer started, queue started
-
-    // Interested in state changes and my own events which lead to
-    // ownership.
-    if ((before != after || selfDelivered) && isOwner(after)) {
-        QPID_LOG(trace, "cluster: start consumers on " << queue.getName() <<
", timer "
-                 << (after==SHARED_OWNER? "start" : "stop"));
-        queue.startConsumers();
-        if (after == SHARED_OWNER) timer.start();
-        else timer.stop();
+    // Interested in state changes which lead to ownership.
+    // We voluntarily give up ownership before multicasting
+    // the state change so we don't need to handle transitions
+    // that lead to non-ownership.
+    if (before != after && isOwner(after)) {
+        bool start = false;
+        {
+            sys::Mutex::ScopedLock l(lock);
+            start = !consuming;
+            consuming = true;
+            ticks = 0;
+        }
+        if (start) queue.startConsumers();
     }
-
-    // If we lost ownership then the queue and timer will already have
-    // been stopped by timeout()
 }
 
 // FIXME aconway 2011-07-27: Dont spin the token on an empty queue.
 
 // Called in broker threads when a consumer is added
 void QueueContext::consume(size_t n) {
-    sys::Mutex::ScopedLock l(lock);
-    consumers = n;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        consumers = n;
+    }
     if (n == 1) mcast.mcast(
         framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
 }
 
 // Called in broker threads when a consumer is cancelled
 void QueueContext::cancel(size_t n) {
-    sys::Mutex::ScopedLock l(lock);
-    consumers = n;
-    // When consuming threads are stopped, this->stopped will be called.
-    if (n == 0) {
-        QPID_LOG(trace, "cluster: all consumers canceled on " << queue.getName());
-        timer.stop();
-        queue.stopConsumers();
+    bool stop = false;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        consumers = n;
+        stop = (n == 0 && consuming);
     }
+    if (stop) queue.stopConsumers();
 }
 
-// Called in timer thread.
-void QueueContext::timeout() {
+// Called in Ticker thread.
+void QueueContext::tick() {
+    bool stop = false;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        stop = (consuming && ++ticks >= maxTicks);
+    }
     // When all threads have stopped, queue will call stopped()
-    QPID_LOG(trace, "cluster: lock timeout on " << queue.getName());
-    queue.stopConsumers();
+    if (stop) queue.stopConsumers();
 }
 
 // Callback set up by queue.stopConsumers() called in connection or timer thread.
 // Called when no threads are dispatching from the queue.
 void QueueContext::stopped() {
-    sys::Mutex::ScopedLock l(lock);
-    QPID_LOG(trace, "cluster: stopped consumers, "
-             << (consumers == 0 ? "unsubscribe" : "resubscribe")
-             << " to " << queue.getName());
-    if (consumers == 0)
-        mcast.mcast(framing::ClusterQueueUnsubscribeBody(
-                        framing::ProtocolVersion(), queue.getName()));
-    else            // FIXME aconway 2011-09-13: check if we're owner?
+    bool resubscribe = false;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        assert(consuming);
+        consuming = false;
+        resubscribe = consumers;
+    }
+    if (resubscribe)
         mcast.mcast(framing::ClusterQueueResubscribeBody(
                         framing::ProtocolVersion(), queue.getName()));
+    else
+        mcast.mcast(framing::ClusterQueueUnsubscribeBody(
+                        framing::ProtocolVersion(), queue.getName()));
 }
 
 void QueueContext::requeue(uint32_t position, bool redelivered) {

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h Fri Nov  4
20:26:52 2011
@@ -23,8 +23,9 @@
  */
 
 #include "LockedMap.h"
-#include "CountdownTimer.h"
+#include "Ticker.h"
 #include "qpid/RefCounted.h"
+#include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Time.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/cluster/types.h"
@@ -39,24 +40,24 @@ class QueuedMessage;
 namespace cluster {
 
 class Multicaster;
+class Group;
 
  /**
  * Queue state that is not replicated to the cluster.
  * Manages the local queue start/stop status.
  *
- * Thread safe: Called by connection, dispatch and timer threads.
+* THREAD SAFE: Called by connection threads and Ticker dispatch threads.
  */
-class QueueContext : public RefCounted {
+class QueueContext : public Ticker::Tickable {
   public:
-    QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster& m);
+    QueueContext(broker::Queue&, Group&, size_t consumeTicks);
     ~QueueContext();
 
     /** Replica state has changed, called in deliver thread.
      * @param before replica state before the event.
      * @param before replica state after the event.
-     * @param self is true if this was a self-delivered event.
      */
-    void replicaState(QueueOwnership before, QueueOwnership after, bool self);
+    void replicaState(QueueOwnership before, QueueOwnership after);
 
     /** Called when queue is stopped, no threads are dispatching.
      * May be called in connection or deliver thread.
@@ -73,8 +74,8 @@ class QueueContext : public RefCounted {
      */
     void cancel(size_t n);
 
-    /** Called in timer thread when the timer runs out. */
-    void timeout();
+    /** Called regularly at the tick interval in an IO thread.*/
+    void tick();
 
     /** Called by MessageHandler to requeue a message. */
     void requeue(uint32_t position, bool redelivered);
@@ -93,13 +94,18 @@ class QueueContext : public RefCounted {
 
 private:
     sys::Mutex lock;
-    CountdownTimer timer;
+    size_t consumers;           // Number of local consumers
+    bool consuming;             // True if we have the lock & local consumers are active
+    size_t ticks;               // Ticks since we got the lock.
+
+    // Following members are immutable
     broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr?
     Multicaster& mcast;
-    size_t consumers;
     size_t hash;
+    size_t maxTicks;            // Max ticks we are allowed.
 
-    typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; 
+    // Following members are safe to use without holding a lock
+    typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap;
     UnackedMap unacked;
 };
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp Fri Nov
 4 20:26:52 2011
@@ -24,6 +24,7 @@
 #include "QueueContext.h"
 #include "QueueHandler.h"
 #include "QueueReplica.h"
+#include "Settings.h"
 #include "qpid/Exception.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueuedMessage.h"
@@ -33,10 +34,8 @@
 namespace qpid {
 namespace cluster {
 
-QueueHandler::QueueHandler(Group& g, const Settings& s)
-    : HandlerBase(g.getEventHandler()),
-      multicaster(g.getMulticaster()),
-      consumeLock(s.getConsumeLock())
+QueueHandler::QueueHandler(Group& g, Settings& s)
+    : HandlerBase(g.getEventHandler()), group(g), consumeTicks(s.consumeTicks)
 {}
 
 bool QueueHandler::handle(const framing::AMQFrame& frame) {
@@ -62,7 +61,7 @@ void QueueHandler::left(const MemberId& 
 void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
     // Local queues already have a context, remote queues need one.
     if (!QueueContext::get(*q))
-        new QueueContext(*q, consumeLock, multicaster); // Context attaches to the Queue
+        new QueueContext(*q, group, consumeTicks); // Context attaches to the Queue
     queues[q->getName()] = boost::intrusive_ptr<QueueReplica>(
         new QueueReplica(q, self()));
 }

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h Fri Nov  4
20:26:52 2011
@@ -23,7 +23,6 @@
  */
 
 #include "HandlerBase.h"
-#include "Settings.h"
 #include "qpid/framing/AMQP_AllOperations.h"
 #include "boost/shared_ptr.hpp"
 #include "boost/intrusive_ptr.hpp"
@@ -42,6 +41,7 @@ class EventHandler;
 class QueueReplica;
 class Multicaster;
 class Group;
+class Settings;
 
 /**
  * Handler for queue subscription events.
@@ -54,7 +54,7 @@ class QueueHandler : public framing::AMQ
                      public HandlerBase
 {
   public:
-    QueueHandler(Group&, const Settings&);
+    QueueHandler(Group&, Settings&);
 
     bool handle(const framing::AMQFrame& body);
 
@@ -76,8 +76,8 @@ class QueueHandler : public framing::AMQ
     boost::intrusive_ptr<QueueReplica> find(const std::string& queue);
 
     QueueMap queues;
-    Multicaster& multicaster;
-    sys::Duration consumeLock;
+    Group& group;
+    size_t consumeTicks;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp Fri Nov
 4 20:26:52 2011
@@ -53,7 +53,7 @@ std::ostream& operator<<(std::ostream& o
 void QueueReplica::subscribe(const MemberId& member) {
     QueueOwnership before = getState();
     subscribers.push_back(member);
-    update(before, member);
+    update(before);
 }
 
 // FIXME aconway 2011-09-20: need to requeue.
@@ -61,7 +61,7 @@ void QueueReplica::unsubscribe(const Mem
     QueueOwnership before = getState();
     MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
     if (i != subscribers.end()) subscribers.erase(i, subscribers.end());
-    update(before, member);
+    update(before);
 }
 
 void QueueReplica::resubscribe(const MemberId& member) {
@@ -69,14 +69,14 @@ void QueueReplica::resubscribe(const Mem
     QueueOwnership before = getState();
     subscribers.pop_front();
     subscribers.push_back(member);
-    update(before, member);
+    update(before);
 }
 
-void QueueReplica::update(QueueOwnership before, MemberId member) {
+void QueueReplica::update(QueueOwnership before) {
     QueueOwnership after = getState();
     QPID_LOG(trace, "cluster: queue replica: " << queue->getName() << ": "
                  << before << "->" << after << " [" << PrintSubscribers(subscribers,
self) << "]");
-    context->replicaState(before, after, member == self);
+    context->replicaState(before, after);
 }
 
 QueueOwnership QueueReplica::getState() const {

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h Fri Nov  4
20:26:52 2011
@@ -68,7 +68,7 @@ class QueueReplica : public RefCounted
     QueueOwnership getState() const;
     bool isOwner() const;
     bool isSubscriber(const MemberId&) const;
-    void update(QueueOwnership before, MemberId from);
+    void update(QueueOwnership before);
 
   friend struct PrintSubscribers;
   friend std::ostream& operator<<(std::ostream&, QueueOwnership);

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp Fri Nov  4 20:26:52
2011
@@ -26,7 +26,8 @@ namespace qpid {
 namespace cluster {
 
 Settings::Settings() :    // Default settings
-    consumeLockMicros(10000),
+    tick(10000),                // FIXME aconway 2011-11-03: smaller default
+    consumeTicks(2),
     concurrency(sys::SystemInfo::concurrency() + 1)
 {}
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h?rev=1197748&r1=1197747&r2=1197748&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h Fri Nov  4 20:26:52
2011
@@ -34,10 +34,11 @@ namespace cluster {
 struct Settings {
     Settings();
     std::string name;
-    uint32_t consumeLockMicros;
+    uint32_t tick;
+    uint32_t consumeTicks;
     uint32_t concurrency;
 
-    sys::Duration getConsumeLock() const { return consumeLockMicros * sys::TIME_USEC; }
+    sys::Duration getTick() const { return tick * sys::TIME_USEC; }
 };
 
 }} // namespace qpid::cluster

Added: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp?rev=1197748&view=auto
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp (added)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp Fri Nov  4 20:26:52
2011
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Ticker.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace cluster {
+
+Ticker::Tickable::~Tickable() {}
+
+Ticker::Ticker(sys::Duration tick, sys::Timer& timer_,
+               boost::shared_ptr<sys::Poller> poller)
+    : sys::TimerTask(tick, "Cluster ticker"),  timer(timer_),
+      condition(boost::bind(&Ticker::dispatch, this, _1), poller)
+{
+    timer.add(this);
+}
+
+void Ticker::add(boost::intrusive_ptr<Tickable> t) {
+    sys::Mutex::ScopedLock l(lock);
+    tickables.push_back(t);
+}
+
+void Ticker::remove(boost::intrusive_ptr<Tickable> t) {
+    sys::Mutex::ScopedLock l(lock);
+    Tickables::iterator i = std::find(tickables.begin(), tickables.end(), t);
+    if (i != tickables.end()) tickables.erase(i);
+}
+
+// Called by timer thread, sets condition
+void Ticker::fire() {
+    condition.set();
+    setupNextFire();
+    timer.add(this);
+}
+
+// Called only in condition IO thread.
+void Ticker::dispatch(sys::PollableCondition& cond) {
+    assert(&cond == &condition);
+    {
+        sys::Mutex::ScopedLock l(lock);
+        working = tickables;
+    }
+    // This is safe outside the lock see comment in Ticker.h
+    for(Tickables::iterator i = working.begin(); i!= working.end(); ++i)
+        (*i)->tick();
+    condition.clear();          // Ready for next tick.
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.h?rev=1197748&view=auto
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.h (added)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.h Fri Nov  4 20:26:52
2011
@@ -0,0 +1,86 @@
+#ifndef QPID_CLUSTER_EXP_TICKER_H
+#define QPID_CLUSTER_EXP_TICKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "qpid/RefCounted.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/PollableCondition.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <vector>
+
+namespace qpid {
+
+namespace sys {
+class Poller;
+}
+
+namespace cluster {
+
+/**
+ * Generate regular calls to QueueContext::tick.
+ * Work of caling tick is not done in the timer thread.
+ * The timer task triggers a PollableCondition, which calls the ticks.
+ *
+ * THREAD SAFE: add/remove are called in connection or deliver
+ * threads, fire is called in timer thread and tick is called in the
+ * IO thread for the PollableCondition.
+ */
+class Ticker : public  sys::TimerTask
+{
+  public:
+    struct Tickable : public RefCounted {
+        virtual ~Tickable();
+        virtual void tick() = 0;
+    };
+
+    Ticker(sys::Duration tick, sys::Timer&, boost::shared_ptr<sys::Poller>);
+
+    void add(boost::intrusive_ptr<Tickable>);
+    void remove(boost::intrusive_ptr<Tickable>);
+
+  private:
+    typedef std::vector<boost::intrusive_ptr<Tickable> > Tickables;
+
+    void fire();                // Called in timer thread.
+    void dispatch(sys::PollableCondition&); // Called in IO thread
+
+    sys::Timer& timer;
+    sys::PollableCondition condition;
+
+    sys::Mutex lock;
+    Tickables tickables;
+
+    // Only accessed in the condition IO thread so no lock needed.
+    // This is a member to keep memory allocated by the vector and
+    // avoid re-allocation each time
+    Tickables working;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EXP_TICKER_H*/

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Ticker.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message