qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1150685 - in /qpid/branches/qpid-2920/qpid/cpp/src: qpid/broker/Queue.cpp qpid/broker/Queue.h qpid/sys/Stoppable.h tests/QueueTest.cpp
Date Mon, 25 Jul 2011 13:12:30 GMT
Author: aconway
Date: Mon Jul 25 13:12:29 2011
New Revision: 1150685

URL: http://svn.apache.org/viewvc?rev=1150685&view=rev
Log:
QPID-2920: Allow stopping consumers on queues.

Stop consumers from dispatching and wait for already dispatching consumers to exit.

Added:
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/Stoppable.h   (with props)
Modified:
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-2920/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1150685&r1=1150684&r2=1150685&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp Mon Jul 25 13:12:29 2011
@@ -373,11 +373,18 @@ void Queue::removeListener(Consumer::sha
 
 bool Queue::dispatch(Consumer::shared_ptr c)
 {
-    QueuedMessage msg(this);
-    if (getNextMessage(msg, c)) {
-        c->deliver(msg);
-        return true;
-    } else {
+    Stoppable::Scope doDispatch(dispatching);
+    if (doDispatch) {
+        QueuedMessage msg(this);
+        if (getNextMessage(msg, c)) {
+            c->deliver(msg);
+            return true;
+        } else {
+            return false;
+        }
+    } else { // Dispatching is stopped
+        Mutex::ScopedLock locker(messageLock);
+        listeners.addListener(c); // FIXME aconway 2011-05-05:
         return false;
     }
 }
@@ -1257,3 +1264,13 @@ void Queue::UsageBarrier::destroy()
     parent.deleted = true;
     while (count) parent.messageLock.wait();
 }
+
+// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
+void Queue::stop() {
+    dispatching.stop();
+}
+
+void Queue::start() {
+    dispatching.start();
+    notifyListener();
+}

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.h?rev=1150685&r1=1150684&r2=1150685&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.h Mon Jul 25 13:12:29 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,7 +35,7 @@
 #include "qpid/broker/RateTracker.h"
 
 #include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Stoppable.h"
 #include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Queue.h"
@@ -70,17 +70,18 @@ class Exchange;
 class Queue : public boost::enable_shared_from_this<Queue>,
               public PersistableQueue, public management::Manageable {
 
+    // Used to prevent destruction of the queue while it is in use.
     struct UsageBarrier
     {
         Queue& parent;
         uint count;
-                
+
         UsageBarrier(Queue&);
         bool acquire();
         void release();
         void destroy();
     };
-            
+
     struct ScopedUse
     {
         UsageBarrier& barrier;
@@ -88,7 +89,7 @@ class Queue : public boost::enable_share
         ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
         ~ScopedUse() { if (acquired) barrier.release(); }
     };
-            
+
     typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
     enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
 
@@ -129,6 +130,8 @@ class Queue : public boost::enable_share
     UsageBarrier barrier;
     int autoDeleteTimeout;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
+    // Allow dispatching consumer threads to be stopped.
+    sys::Stoppable dispatching;
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
     void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -184,8 +187,8 @@ class Queue : public boost::enable_share
     typedef std::vector<shared_ptr> vector;
 
     QPID_BROKER_EXTERN Queue(const std::string& name,
-                             bool autodelete = false, 
-                             MessageStore* const store = 0, 
+                             bool autodelete = false,
+                             MessageStore* const store = 0,
                              const OwnershipToken* const owner = 0,
                              management::Manageable* parent = 0,
                              Broker* broker = 0);
@@ -245,11 +248,11 @@ class Queue : public boost::enable_share
                                     bool exclusive = false);
     QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
 
-    uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>());
//defaults to all messages 
+    uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>());
//defaults to all messages
     QPID_BROKER_EXTERN void purgeExpired();
 
     //move qty # of messages to destination Queue destq
-    uint32_t move(const Queue::shared_ptr destq, uint32_t qty); 
+    uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
 
     QPID_BROKER_EXTERN uint32_t getMessageCount() const;
     QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -288,8 +291,8 @@ class Queue : public boost::enable_share
      * Inform queue of messages that were enqueued, have since
      * been acquired but not yet accepted or released (and
      * thus are still logically on the queue) - used in
-     * clustered broker.  
-     */ 
+     * clustered broker.
+     */
     void updateEnqueued(const QueuedMessage& msg);
 
     /**
@@ -300,9 +303,9 @@ class Queue : public boost::enable_share
      * accepted it).
      */
     bool isEnqueued(const QueuedMessage& msg);
-            
+
     /**
-     * Gets the next available message 
+     * Gets the next available message
      */
     QPID_BROKER_EXTERN QueuedMessage get();
 
@@ -377,9 +380,21 @@ class Queue : public boost::enable_share
     void flush();
 
     const Broker* getBroker();
+
+    /** Stop consumers. Return when all consumer threads are stopped.
+     *@pre Queue is active and not already stopping.
+     */
+    void stop();
+
+    /** Start consumers.
+     *@pre Queue is stopped and idle: no thread in dispatch.
+     */
+    void start();
+
+    /** Context data attached and used by cluster code. */
+    boost::intrusive_ptr<qpid::RefCounted> clusterContext;
 };
-}
-}
+}} // qpid::broker
 
 
 #endif  /*!_broker_Queue_h*/

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/Stoppable.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/Stoppable.h?rev=1150685&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/Stoppable.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/Stoppable.h Mon Jul 25 13:12:29 2011
@@ -0,0 +1,91 @@
+#ifndef QPID_SYS_STOPPABLE_H
+#define QPID_SYS_STOPPABLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace sys {
+
+/**
+ * An activity that may be executed by multiple threads, and can be stopped.
+ * Stopping prevents new threads from entering and waits till exiting busy threads leave.
+ */
+class Stoppable {
+  public:
+    Stoppable() : busy(0), stopped(false) {}
+    ~Stoppable() { stop(); }
+
+    /** Mark the scope of a busy thread like this:
+     * <pre>
+     * {
+     *   Stoppable::Scope working(stoppable);
+     *   if (working) { do stuff }
+     * }
+     * </pre>
+     */
+    class Scope {
+        Stoppable& state;
+        bool entered;
+      public:
+        Scope(Stoppable& s) : state(s) { entered = s.enter(); }
+        ~Scope() { if (entered) state.exit(); }
+        operator bool() const { return entered; }
+    };
+
+  friend class Scope;
+
+    /** Mark  stopped, wait for all threads to leave their busy scope. */
+    void stop() {
+        sys::Monitor::ScopedLock l(lock);
+        stopped = true;
+        while (busy > 0) lock.wait();
+    }
+
+    /** Set the state to started.
+     *@pre state is stopped and no theads are busy.
+     */
+    void start() {
+        sys::Monitor::ScopedLock l(lock);
+        assert(stopped && busy == 0); // FIXME aconway 2011-05-06: error handling.
+        stopped = false;
+    }
+
+  private:
+    uint busy;
+    bool stopped;
+    sys::Monitor lock;
+
+    bool enter() {
+        sys::Monitor::ScopedLock l(lock);
+        if (!stopped) ++busy;
+        return !stopped;
+    }
+
+    void exit() {
+        sys::Monitor::ScopedLock l(lock);
+        assert(busy > 0);
+        if (--busy == 0) lock.notifyAll();
+    }
+};
+
+}} // namespace qpid::sys
+
+#endif  /*!QPID_SYS_STOPPABLE_H*/

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/Stoppable.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/Stoppable.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/QueueTest.cpp?rev=1150685&r1=1150684&r2=1150685&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/QueueTest.cpp Mon Jul 25 13:12:29 2011
@@ -1,4 +1,4 @@
- /*
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -41,6 +41,7 @@
 
 #include <iostream>
 #include "boost/format.hpp"
+#include <boost/enable_shared_from_this.hpp>
 
 using boost::intrusive_ptr;
 using namespace qpid;
@@ -57,16 +58,22 @@ public:
     typedef boost::shared_ptr<TestConsumer> shared_ptr;
 
     intrusive_ptr<Message> last;
-    bool received;
-    TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+    bool received, notified;
+
+    TestConsumer(bool acquire = true):
+        Consumer(acquire), received(false), notified(false) {};
 
     virtual bool deliver(QueuedMessage& msg){
         last = msg.payload;
         received = true;
         return true;
     };
-    void notify() {}
+    void notify() {
+        notified = true;
+    }
+
     OwnershipToken* getSession() { return 0; }
+    void reset() { last = intrusive_ptr<Message>(); received = false; }
 };
 
 class FailOnDeliver : public Deliverable
@@ -303,11 +310,11 @@ QPID_AUTO_TEST_CASE(testSeek){
 
     QueuedMessage qm;
     queue->dispatch(consumer);
-    
+
     BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
     queue->dispatch(consumer);
     queue->dispatch(consumer); // make sure over-run is safe
- 
+
 }
 
 QPID_AUTO_TEST_CASE(testSearch){
@@ -325,15 +332,15 @@ QPID_AUTO_TEST_CASE(testSearch){
 
     SequenceNumber seq(2);
     QueuedMessage qm = queue->find(seq);
-    
+
     BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
-    
+
     queue->acquire(qm);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
     SequenceNumber seq1(3);
     QueuedMessage qm1 = queue->find(seq1);
     BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
-    
+
 }
 const std::string nullxid = "";
 
@@ -1106,6 +1113,30 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
     BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
 }
 
+QPID_AUTO_TEST_CASE(testStopStart) {
+    boost::shared_ptr<Queue> q(new Queue("foo"));
+    boost::shared_ptr<TestConsumer> c(new TestConsumer);
+    intrusive_ptr<Message> m = create_message("x","y");
+    q->consume(c);
+    // Initially q is started.
+    q->deliver(m);
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK(c->received);
+    c->reset();
+    // Stop q, should not receive message
+    q->stop();
+    q->deliver(m);
+    BOOST_CHECK(!q->dispatch(c));
+    BOOST_CHECK(!c->received);
+    BOOST_CHECK(!c->notified);
+    // Start q, should be notified and delivered
+    q->start();
+    q->deliver(m);
+    BOOST_CHECK(c->notified);
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK(c->received);
+}
+
 
 QPID_AUTO_TEST_SUITE_END()
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message