qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1071013 - in /qpid/trunk/qpid: cpp/src/qpid/broker/Queue.cpp cpp/src/qpid/broker/Queue.h tests/src/py/qpid_tests/broker_0_10/__init__.py tests/src/py/qpid_tests/broker_0_10/extensions.py
Date Tue, 15 Feb 2011 18:58:44 GMT
Author: gsim
Date: Tue Feb 15 18:58:44 2011
New Revision: 1071013

URL: http://svn.apache.org/viewvc?rev=1071013&view=rev
Log:
QPID-3000: Added optional delay for auto-deletion

Added:
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py
Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1071013&r1=1071012&r2=1071013&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Feb 15 18:58:44 2011
@@ -77,6 +77,7 @@ const std::string qpidLastValueQueueNoBr
 const std::string qpidPersistLastNode("qpid.persist_last_node");
 const std::string qpidVQMatchProperty("qpid.LVQ_key");
 const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
+const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout");
 //following feature is not ready for general use as it doesn't handle
 //the case where a message is enqueued on more than one queue well enough:
 const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
@@ -108,7 +109,8 @@ Queue::Queue(const string& _name, bool _
     insertSeqNo(0),
     broker(b),
     deleted(false),
-    barrier(*this)
+    barrier(*this),
+    autoDeleteTimeout(0)
 {
     if (parent != 0 && broker != 0) {
         ManagementAgent* agent = broker->getManagementAgent();
@@ -398,6 +400,10 @@ void Queue::consume(Consumer::shared_ptr
     consumerCount++;
     if (mgmtObject != 0)
         mgmtObject->inc_consumerCount ();
+    //reset auto deletion timer if necessary
+    if (autoDeleteTimeout && autoDeleteTask) {
+        autoDeleteTask->cancel();
+    }
 }
 
 void Queue::cancel(Consumer::shared_ptr c){
@@ -567,7 +573,7 @@ uint32_t Queue::getConsumerCount() const
 bool Queue::canAutoDelete() const
 {
     Mutex::ScopedLock locker(consumerLock);
-    return autodelete && !consumerCount;
+    return autodelete && !consumerCount && !owner;
 }
 
 void Queue::clearLastNodeFailure()
@@ -726,6 +732,28 @@ void Queue::create(const FieldTable& _se
     configure(_settings);
 }
 
+
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string&
key)
+{
+    qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+    if (!v) {
+        return 0;
+    } else if (v->convertsTo<int>()) {
+        return v->get<int>();
+    } else if (v->convertsTo<std::string>()){
+        std::string s = v->get<std::string>();
+        try { 
+            return boost::lexical_cast<int>(s); 
+        } catch(const boost::bad_lexical_cast&) {
+            QPID_LOG(warning, "Ignoring invalid integer value for " << key <<
": " << s);
+            return 0;
+        }
+    } else {
+        QPID_LOG(warning, "Ignoring invalid integer value for " << key << ":
" << *v);
+        return 0;
+    }
+}
+
 void Queue::configure(const FieldTable& _settings, bool recovering)
 {
 
@@ -787,6 +815,10 @@ void Queue::configure(const FieldTable& 
     FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
     if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
 
+    autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
+    if (autoDeleteTimeout) 
+        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout="
<< autoDeleteTimeout); 
+
     if (mgmtObject != 0)
         mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
 
@@ -813,6 +845,7 @@ void Queue::destroy()
         store->destroy(*this);
         store = 0;//ensure we make no more calls to the store for this queue
     }
+    if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
 }
 
 void Queue::notifyDeleted()
@@ -917,15 +950,46 @@ boost::shared_ptr<Exchange> Queue::getAl
     return alternateExchange;
 }
 
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
 {
     if (broker.getQueues().destroyIf(queue->getName(), 
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete),
queue))) {
+        QPID_LOG(debug, "Auto-deleting " << queue->getName());
         queue->unbind(broker.getExchanges(), queue);
         queue->destroy();
     }
 }
 
+struct AutoDeleteTask : qpid::sys::TimerTask
+{
+    Broker& broker;
+    Queue::shared_ptr queue;
+
+    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) 
+        : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+
+    void fire()
+    {
+        //need to detect case where queue was used after the task was
+        //created, but then became unused again before the task fired;
+        //in this case ignore this request as there will have already
+        //been a later task added
+        tryAutoDeleteImpl(broker, queue);
+    }
+};
+
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+    if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
+        AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
+        queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker,
queue, time));
+        broker.getClusterTimer().add(queue->autoDeleteTask);        
+        QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << "
initiated");
+    } else {
+        tryAutoDeleteImpl(broker, queue);
+    }
+}
+
 bool Queue::isExclusiveOwner(const OwnershipToken* const o) const 
 { 
     Mutex::ScopedLock locker(ownershipLock);
@@ -940,6 +1004,10 @@ void Queue::releaseExclusiveOwnership() 
 
 bool Queue::setExclusiveOwner(const OwnershipToken* const o) 
 { 
+    //reset auto deletion timer if necessary
+    if (autoDeleteTimeout && autoDeleteTask) {
+        autoDeleteTask->cancel();
+    }
     Mutex::ScopedLock locker(ownershipLock);
     if (owner) {
         return false;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1071013&r1=1071012&r2=1071013&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Feb 15 18:58:44 2011
@@ -36,6 +36,7 @@
 
 #include "qpid/framing/FieldTable.h"
 #include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Queue.h"
 #include "qpid/framing/amqp_types.h"
@@ -126,6 +127,8 @@ class Queue : public boost::enable_share
     Broker* broker;
     bool deleted;
     UsageBarrier barrier;
+    int autoDeleteTimeout;
+    boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
     void setPolicy(std::auto_ptr<QueuePolicy> policy);

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1071013&r1=1071012&r2=1071013&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py Tue Feb 15 18:58:44 2011
@@ -32,3 +32,4 @@ from tx import *
 from lvq import *
 from priority import *
 from threshold import *
+from extensions import *

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py?rev=1071013&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py Tue Feb 15 18:58:44
2011
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase010
+from time import sleep
+
+class ExtensionTests(TestBase010):
+    """Tests for various extensions to AMQP 0-10"""
+
+    def test_timed_autodelete(self):
+        session = self.session
+        session2 = self.conn.session("another-session")
+        session2.queue_declare(queue="my-queue", exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":5})
+        session2.close()
+        result = session.queue_query(queue="my-queue")
+        self.assertEqual("my-queue", result.queue)
+        sleep(5)
+        result = session.queue_query(queue="my-queue")
+        self.assert_(not result.queue)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message