qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1177829 - in /qpid/branches/qpid-2920-active/qpid/cpp/src/qpid: broker/Queue.cpp broker/Queue.h cluster/exp/BrokerContext.cpp sys/Activity.h sys/Stoppable.h
Date Fri, 30 Sep 2011 20:55:40 GMT
Author: aconway
Date: Fri Sep 30 20:55:40 2011
New Revision: 1177829

URL: http://svn.apache.org/viewvc?rev=1177829&view=rev
Log:
QPID-2920: Renamed Stoppable to Activity, clearer naming.

Added:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h   (contents, props changed)
      - copied, changed from r1177676, qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h
Removed:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h
Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1177829&r1=1177828&r2=1177829&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp Fri Sep 30 20:55:40
2011
@@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMes
 {
     while (true) {
         ClusterAcquireScope acquireScope; // Outside the lock
-        Stoppable::Scope consumeScope(consuming);
+        Activity::Scope consumeScope(consuming);
         Mutex::ScopedLock locker(messageLock);
         if (!consumeScope) {
             QPID_LOG(trace, "Queue stopped, can't  consume: " << name);
@@ -1288,6 +1288,8 @@ void Queue::startConsumers() {
     notifyListener();
 }
 
+bool Queue::isConsumingStopped() { return consuming.isStopped(); }
+
 // Called when all busy threads exited after stopConsumers()
 void Queue::consumingStopped() {
     QPID_LOG(trace, "Stopped consumers on " << getName());

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h?rev=1177829&r1=1177828&r2=1177829&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h Fri Sep 30 20:55:40 2011
@@ -31,11 +31,10 @@
 #include "qpid/broker/QueueBindings.h"
 #include "qpid/broker/QueueListeners.h"
 #include "qpid/broker/QueueObserver.h"
-
 #include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Activity.h"
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Monitor.h"
-#include "qpid/sys/Stoppable.h"
 #include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Queue.h"
@@ -130,7 +129,7 @@ class Queue : public boost::enable_share
     UsageBarrier barrier;
     int autoDeleteTimeout;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
-    sys::Stoppable consuming; // Allow consumer threads to be stopped, used by cluster
+    sys::Activity consuming; // Allow consumer threads to be stopped, used by cluster
     boost::intrusive_ptr<RefCounted> clusterContext; // Used by cluster
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
@@ -398,6 +397,9 @@ class Queue : public boost::enable_share
     /** Start consumers. */
     void startConsumers();
 
+    /** Return true if consumers are stopped */
+    bool isConsumingStopped();
+
     /** Context information used in a cluster. */
     boost::intrusive_ptr<RefCounted> getClusterContext() { return clusterContext; }
     void setClusterContext(boost::intrusive_ptr<RefCounted> context) { clusterContext
= context; }

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp?rev=1177829&r1=1177828&r2=1177829&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp Fri Sep
30 20:55:40 2011
@@ -52,9 +52,9 @@ using namespace broker;
 namespace {
 const ProtocolVersion pv;     // shorthand
 
-// noReplicate means the current thread is handling a message
-// received from the cluster so it should not be replicated.
-QPID_TSS bool tssNoReplicate = false;
+// True means the current thread is handling a local event that should be replicated.
+// False means we're handling a cluster event it should not be replicated.
+QPID_TSS bool tssReplicate = true;
 }
 
 // FIXME aconway 2011-09-26: de-const the broker::Cluster interface,
@@ -72,13 +72,13 @@ Multicaster& BrokerContext::mcaster(cons
 }
 
 BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() {
-    assert(!tssNoReplicate);
-    tssNoReplicate = true;
+    assert(tssReplicate);
+    tssReplicate = false;
 }
 
 BrokerContext::ScopedSuppressReplication::~ScopedSuppressReplication() {
-    assert(tssNoReplicate);
-    tssNoReplicate = false;
+    assert(!tssReplicate);
+    tssReplicate = true;
 }
 
 BrokerContext::BrokerContext(Core& c, boost::intrusive_ptr<QueueHandler> q)
@@ -88,7 +88,7 @@ BrokerContext::~BrokerContext() {}
 
 bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>&
msg)
 {
-    if (tssNoReplicate) return true;
+    if (!tssReplicate) return true;
     // FIXME aconway 2010-10-20: replicate message in fragments
     // (frames), using fixed size bufffers.
     std::string data(msg->encodedSize(),char());
@@ -104,18 +104,18 @@ void BrokerContext::routing(const boost:
 void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {}
 
 void BrokerContext::acquire(const broker::QueuedMessage& qm) {
-    if (tssNoReplicate) return;
-    mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position));
+    if (tssReplicate)
+        mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position));
 }
 
 void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
-    if (!tssNoReplicate)
+    if (tssReplicate)
         mcaster(qm).mcast(
             ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position));
 }
 
 void BrokerContext::requeue(const broker::QueuedMessage& qm) {
-    if (!tssNoReplicate)
+    if (tssReplicate)
         mcaster(qm).mcast(ClusterMessageRequeueBody(
                        pv,
                        qm.queue->getName(),
@@ -125,7 +125,7 @@ void BrokerContext::requeue(const broker
 
 // FIXME aconway 2011-06-08: should be be using shared_ptr to q here?
 void BrokerContext::create(broker::Queue& q) {
-    if (tssNoReplicate) return;
+    if (!tssReplicate) return;
     assert(!QueueContext::get(q));
     boost::intrusive_ptr<QueueContext> context(
         new QueueContext(q, core.getSettings().getConsumeLock(), mcaster(q.getName())));
@@ -137,12 +137,12 @@ void BrokerContext::create(broker::Queue
 }
 
 void BrokerContext::destroy(broker::Queue& q) {
-    if (tssNoReplicate) return;
+    if (!tssReplicate) return;
      mcaster(q).mcast(ClusterWiringDestroyQueueBody(pv, q.getName()));
 }
 
 void BrokerContext::create(broker::Exchange& ex) {
-    if (tssNoReplicate) return;
+    if (!tssReplicate) return;
     std::string data(ex.encodedSize(), '\0');
     framing::Buffer buf(&data[0], data.size());
     ex.encode(buf);
@@ -150,7 +150,7 @@ void BrokerContext::create(broker::Excha
 }
 
 void BrokerContext::destroy(broker::Exchange& ex) {
-    if (tssNoReplicate) return;
+    if (!tssReplicate) return;
     mcaster(ex.getName()).mcast(
         ClusterWiringDestroyExchangeBody(pv, ex.getName()));
 }
@@ -158,14 +158,14 @@ void BrokerContext::destroy(broker::Exch
 void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex,
                          const std::string& key, const framing::FieldTable& args)
 {
-    if (tssNoReplicate) return;
+    if (!tssReplicate) return;
     mcaster(q).mcast(ClusterWiringBindBody(pv, q.getName(), ex.getName(), key, args));
 }
 
 void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex,
                            const std::string& key, const framing::FieldTable& args)
 {
-    if (tssNoReplicate) return;
+    if (!tssReplicate) return;
     mcaster(q).mcast(ClusterWiringUnbindBody(pv, q.getName(), ex.getName(), key, args));
 }
 

Copied: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h (from r1177676, qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h?p2=qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h&p1=qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h&r1=1177676&r2=1177829&rev=1177829&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h Fri Sep 30 20:55:40 2011
@@ -1,5 +1,5 @@
-#ifndef QPID_SYS_STOPPABLE_H
-#define QPID_SYS_STOPPABLE_H
+#ifndef QPID_SYS_ACTIVITY_H
+#define QPID_SYS_ACTIVITY_H
 
 /*
  *
@@ -28,39 +28,42 @@ namespace qpid {
 namespace sys {
 
 /**
- * An activity that may be executed by multiple threads, and can be stopped.
- *
- * Stopping prevents new threads from entering and calls a callback
- * when all busy threads have left.
+ * An activity that may be executed by multiple threads concurrently.
+ * An activity has 3 states:
+ * - active: may have active threads, new threads may enter.
+ * - stopping: may have active threads but  no new threads may enter.
+ * - stopped: no active threads and no new threads may enter.
  */
-class Stoppable {
+class Activity {
   public:
     /**
-     * Initially not stopped.
+     * Initially active.
      *@param stoppedCallback: called when all threads have stopped.
      */
-    Stoppable(boost::function<void()> stoppedCallback)
+    Activity(boost::function<void()> stoppedCallback)
         : busy(0), stopped(false), notify(stoppedCallback) {}
 
-    /** Mark the scope of a busy thread like this:
+    /** Mark the scope of an activity thread like this:
      * <pre>
      * {
-     *   Stoppable::Scope working(stoppable);
-     *   if (working) { do stuff }
+     *   Activity::Scope working(activity);
+     *   if (working) { do stuff } // Only if activity is active.
      * }
      * </pre>
      */
     class Scope {
-        Stoppable& state;
+        Activity& state;
         bool entered;
       public:
-        Scope(Stoppable& s) : state(s) { entered = state.enter(); }
+        Scope(Activity& s) : state(s) { entered = state.enter(); }
         ~Scope() { if (entered) state.exit(); }
         operator bool() const { return entered; }
     };
 
   friend class Scope;
 
+    // FIXME aconway 2011-09-30: fix pre-conditions with asserts, don't allow
+    // multiple stops/starts.
     /**
      * Set state to "stopped", so no new threads can enter.
      * Notify function will be called when all busy threads have left.
@@ -80,8 +83,13 @@ class Stoppable {
         stopped = false;
     }
 
-  private:
+    /** True if Activity is stopped with no */
+    bool isStopped() {
+        sys::Monitor::ScopedLock l(lock);
+        return stopped && busy == 0;
+    }
 
+  private:
     // Busy thread enters scope
     bool enter() {
         sys::Monitor::ScopedLock l(lock);
@@ -110,4 +118,4 @@ class Stoppable {
 
 }} // namespace qpid::sys
 
-#endif  /*!QPID_SYS_STOPPABLE_H*/
+#endif  /*!QPID_SYS_ACTIVITY_H*/

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message