qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1072179 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/management/ cpp/src/tests/ java/broker/src/main/java/org/apache/qpid/qmf/ specs/
Date Fri, 18 Feb 2011 22:38:06 GMT
Author: gsim
Date: Fri Feb 18 22:38:05 2011
New Revision: 1072179

URL: http://svn.apache.org/viewvc?rev=1072179&view=rev
Log:
QPID-3015: Added create and delete methods to management schema for broker

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid-ctrl
    qpid/trunk/qpid/cpp/src/tests/sender.cpp
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
    qpid/trunk/qpid/specs/management-schema.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Feb 18 22:38:05 2011
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
@@ -33,10 +34,19 @@
 #include "qpid/broker/ExpiryPolicy.h"
 
 #include "qmf/org/apache/qpid/broker/Package.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/management/ManagementDirectExchange.h"
 #include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/log/Logger.h"
@@ -44,7 +54,9 @@
 #include "qpid/log/Statement.h"
 #include "qpid/log/posix/SinkOptions.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/ProtocolFactory.h"
 #include "qpid/sys/Poller.h"
@@ -76,7 +88,10 @@ using qpid::management::ManagementAgent;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
+using qpid::management::getManagementExecutionContext;
+using qpid::types::Variant;
 using std::string;
+using std::make_pair;
 
 namespace _qmf = qmf::org::apache::qpid::broker;
 
@@ -443,6 +458,20 @@ Manageable::status_t Broker::ManagementM
         QPID_LOG (debug, "Broker::getLogLevel()");
         status = Manageable::STATUS_OK;
         break;
+    case _qmf::Broker::METHOD_CREATE :
+      {
+          _qmf::ArgsBrokerCreate& a = dynamic_cast<_qmf::ArgsBrokerCreate&>(args);
+          createObject(a.i_type, a.i_name, a.i_properties, a.i_strict, getManagementExecutionContext());
+          status = Manageable::STATUS_OK;
+          break;
+      }
+    case _qmf::Broker::METHOD_DELETE :
+      {
+          _qmf::ArgsBrokerDelete& a = dynamic_cast<_qmf::ArgsBrokerDelete&>(args);
+          deleteObject(a.i_type, a.i_name, a.i_options, getManagementExecutionContext());
+          status = Manageable::STATUS_OK;
+          break;
+      }
    default:
         QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
         status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -452,6 +481,169 @@ Manageable::status_t Broker::ManagementM
     return status;
 }
 
+namespace
+{
+const std::string TYPE_QUEUE("queue");
+const std::string TYPE_EXCHANGE("exchange");
+const std::string TYPE_TOPIC("topic");
+const std::string TYPE_BINDING("binding");
+const std::string DURABLE("durable");
+const std::string AUTO_DELETE("auto-delete");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string EXCHANGE_TYPE("exchange-type");
+const std::string QUEUE_NAME("queue");
+const std::string EXCHANGE_NAME("exchange");
+
+const std::string TRUE("true");
+const std::string FALSE("false");
+}
+
+struct InvalidBindingIdentifier : public qpid::Exception
+{
+    InvalidBindingIdentifier(const std::string& name) : qpid::Exception(name) {}
+    std::string getPrefix() const { return "invalid binding"; }
+};
+
+struct BindingIdentifier
+{
+    std::string exchange;
+    std::string queue;
+    std::string key;
+
+    BindingIdentifier(const std::string& name)
+    {
+        std::vector<std::string> path;
+        split(path, name, "/");
+        switch (path.size()) {
+          case 1:
+            queue = path[0];
+            break;
+          case 2:
+            exchange = path[0];
+            queue = path[1];
+            break;
+          case 3:
+            exchange = path[0];
+            queue = path[1];
+            key = path[2];
+            break;
+          default:
+            throw InvalidBindingIdentifier(name);
+        }
+    }
+};
+
+struct ObjectAlreadyExists : public qpid::Exception
+{
+    ObjectAlreadyExists(const std::string& name) : qpid::Exception(name) {}
+    std::string getPrefix() const { return "object already exists"; }
+};
+
+struct UnknownObjectType : public qpid::Exception
+{
+    UnknownObjectType(const std::string& type) : qpid::Exception(type) {}
+    std::string getPrefix() const { return "unknown object type"; }
+};
+
+void Broker::createObject(const std::string& type, const std::string& name,
+                          const Variant::Map& properties, bool /*strict*/, const ConnectionState* context)
+{
+    std::string userId;
+    std::string connectionId;
+    if (context) {
+        userId = context->getUserId();
+        connectionId = context->getUrl();
+    }
+    //TODO: implement 'strict' option (check there are no unrecognised properties)
+    QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
+    if (type == TYPE_QUEUE) {
+        bool durable(false);
+        bool autodelete(false);
+        std::string alternateExchange;
+        Variant::Map extensions;
+        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            // extract durable, auto-delete and alternate-exchange properties
+            if (i->first == DURABLE) durable = i->second;
+            else if (i->first == AUTO_DELETE) autodelete = i->second;
+            else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
+            //treat everything else as extension properties
+            else extensions[i->first] = i->second;
+        }
+        framing::FieldTable arguments;
+        amqp_0_10::translate(extensions, arguments);
+
+        std::pair<boost::shared_ptr<Queue>, bool> result =
+            createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId);
+        if (!result.second) {
+            throw ObjectAlreadyExists(name);
+        }
+    } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
+        bool durable(false);
+        std::string exchangeType;
+        std::string alternateExchange;
+        Variant::Map extensions;
+        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            // extract durable, auto-delete and alternate-exchange properties
+            if (i->first == DURABLE) durable = i->second;
+            else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
+            else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
+            //treat everything else as extension properties
+            else extensions[i->first] = i->second;
+        }
+        framing::FieldTable arguments;
+        amqp_0_10::translate(extensions, arguments);
+
+        try {
+            std::pair<boost::shared_ptr<Exchange>, bool> result =
+                createExchange(name, exchangeType, durable, alternateExchange, arguments, userId, connectionId);
+            if (!result.second) {
+                throw ObjectAlreadyExists(name);
+            }
+        } catch (const UnknownExchangeTypeException&) {
+            throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType));
+        }
+    } else if (type == TYPE_BINDING) {
+        BindingIdentifier binding(name);
+        std::string exchangeType("topic");
+        Variant::Map extensions;
+        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            // extract durable, auto-delete and alternate-exchange properties
+            if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
+            //treat everything else as extension properties
+            else extensions[i->first] = i->second;
+        }
+        framing::FieldTable arguments;
+        amqp_0_10::translate(extensions, arguments);
+
+        bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
+    } else {
+        throw UnknownObjectType(type);
+    }
+}
+
+void Broker::deleteObject(const std::string& type, const std::string& name,
+                          const Variant::Map& options, const ConnectionState* context)
+{
+    std::string userId;
+    std::string connectionId;
+    if (context) {
+        userId = context->getUserId();
+        connectionId = context->getUrl();
+    }
+    QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")");
+    if (type == TYPE_QUEUE) {
+        deleteQueue(name, userId, connectionId);
+    } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
+        deleteExchange(name, userId, connectionId);
+    } else if (type == TYPE_BINDING) {
+        BindingIdentifier binding(name);
+        unbind(binding.queue, binding.exchange, binding.key, userId, connectionId);
+    } else {
+        throw UnknownObjectType(type);
+    }
+
+}
+
 void Broker::setLogLevel(const std::string& level)
 {
     QPID_LOG(notice, "Changing log level to " << level);
@@ -466,7 +658,7 @@ std::string Broker::getLogLevel()
     const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors;
     for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) {
         if (i != selectors.begin()) level += std::string(",");
-        level += *i;        
+        level += *i;
     }
     return level;
 }
@@ -552,5 +744,221 @@ void Broker::setClusterTimer(std::auto_p
 
 const std::string Broker::TCP_TRANSPORT("tcp");
 
+
+std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
+    const std::string& name,
+    bool durable,
+    bool autodelete,
+    const OwnershipToken* owner,
+    const std::string& alternateExchange,
+    const qpid::framing::FieldTable& arguments,
+    const std::string& userId,
+    const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+        params.insert(make_pair(acl::PROP_PASSIVE, FALSE));
+        params.insert(make_pair(acl::PROP_DURABLE, durable ? TRUE : FALSE));
+        params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? TRUE : FALSE));
+        params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? TRUE : FALSE));
+        params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+        params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+        params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+
+        if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
+    }
+
+    Exchange::shared_ptr alternate;
+    if (!alternateExchange.empty()) {
+        alternate = exchanges.get(alternateExchange);
+        if (!alternate) framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
+    }
+
+    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner);
+    if (result.second) {
+        if (alternate) {
+            result.first->setAlternateExchange(alternate);
+            alternate->incAlternateUsers();
+        }
+
+        //apply settings & create persistent record if required
+        result.first->create(arguments);
+        //add default binding:
+        result.first->bind(exchanges.getDefault(), name);
+
+        if (managementAgent.get()) {
+            //TODO: debatable whether we should raise an event here for
+            //create when this is a 'declare' event; ideally add a create
+            //event instead?
+            managementAgent->raiseEvent(
+                _qmf::EventQueueDeclare(connectionId, userId, name,
+                                        durable, owner, autodelete,
+                                        ManagementAgent::toMap(arguments),
+                                        "created"));
+        }
+    }
+    return result;
+}
+
+void Broker::deleteQueue(const std::string& name, const std::string& userId,
+                         const std::string& connectionId, QueueFunctor check)
+{
+    if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
+        throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
+    }
+
+    Queue::shared_ptr queue = queues.find(name);
+    if (queue) {
+        if (check) check(queue);
+        queues.destroy(name);
+        queue->destroyed();
+    } else {
+        throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
+    }
+
+    if (managementAgent.get())
+        managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
+
+}
+
+std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
+    const std::string& name,
+    const std::string& type,
+    bool durable,
+    const std::string& alternateExchange,
+    const qpid::framing::FieldTable& arguments,
+    const std::string& userId,
+    const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_TYPE, type));
+        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+        params.insert(make_pair(acl::PROP_PASSIVE, FALSE));
+        params.insert(make_pair(acl::PROP_DURABLE, durable ? TRUE : FALSE));
+        if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,&params) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId));
+    }
+
+    Exchange::shared_ptr alternate;
+    if (!alternateExchange.empty()) {
+        alternate = exchanges.get(alternateExchange);
+        if (!alternate) framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
+    }
+
+    std::pair<Exchange::shared_ptr, bool> result;
+    result = exchanges.declare(name, type, durable, arguments);
+    if (result.second) {
+        if (alternate) {
+            result.first->setAlternate(alternate);
+            alternate->incAlternateUsers();
+        }
+        if (durable) {
+            store->create(*result.first, arguments);
+        }
+        if (managementAgent.get()) {
+            //TODO: debatable whether we should raise an event here for
+            //create when this is a 'declare' event; ideally add a create
+            //event instead?
+            managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
+                                                         userId,
+                                                         name,
+                                                         type,
+                                                         alternateExchange,
+                                                         durable,
+                                                         false,
+                                                         ManagementAgent::toMap(arguments),
+                                                         "created"));
+        }
+    }
+    return result;
+}
+
+void Broker::deleteExchange(const std::string& name, const std::string& userId,
+                           const std::string& connectionId)
+{
+    if (acl) {
+        if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
+    }
+
+    Exchange::shared_ptr exchange(exchanges.get(name));
+    if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
+    if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+    if (exchange->isDurable()) store->destroy(*exchange);
+    if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
+    exchanges.destroy(name);
+
+    if (managementAgent.get())
+        managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
+
+}
+
+void Broker::bind(const std::string& queueName,
+                  const std::string& exchangeName,
+                  const std::string& key,
+                  const qpid::framing::FieldTable& arguments,
+                  const std::string& userId,
+                  const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+        params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
+
+        if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId));
+    }
+
+    Queue::shared_ptr queue = queues.find(queueName);
+    Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+    if (!queue) {
+        throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName));
+    } else if (!exchange) {
+        throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
+    } else {
+        if (queue->bind(exchange, key, arguments)) {
+            if (managementAgent.get()) {
+                managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName,
+                                                  queueName, key, ManagementAgent::toMap(arguments)));
+            }
+        }
+    }
+}
+
+void Broker::unbind(const std::string& queueName,
+                    const std::string& exchangeName,
+                    const std::string& key,
+                    const std::string& userId,
+                    const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+        params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
+        if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId));
+    }
+
+    Queue::shared_ptr queue = queues.find(queueName);
+    Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+    if (!queue) {
+        throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName));
+    } else if (!exchange) {
+        throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
+    } else {
+        if (exchange->unbind(queue, key, 0)) {
+            if (exchange->isDurable() && queue->isDurable()) {
+                store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+            }
+            if (managementAgent.get()) {
+                managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
+            }
+        }
+    }
+}
+
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Feb 18 22:38:05 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,6 +49,7 @@
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Timer.h"
+#include "qpid/types/Variant.h"
 #include "qpid/RefCounted.h"
 #include "qpid/broker/AclModule.h"
 #include "qpid/sys/Mutex.h"
@@ -57,7 +58,7 @@
 #include <string>
 #include <vector>
 
-namespace qpid { 
+namespace qpid {
 
 namespace sys {
     class ProtocolFactory;
@@ -68,6 +69,7 @@ struct Url;
 
 namespace broker {
 
+class ConnectionState;
 class ExpiryPolicy;
 class Message;
 
@@ -80,7 +82,7 @@ struct NoSuchTransportException : qpid::
 };
 
 /**
- * A broker instance. 
+ * A broker instance.
  */
 class Broker : public sys::Runnable, public Plugin::Target,
                public management::Manageable,
@@ -148,6 +150,10 @@ public:
     void setStore ();
     void setLogLevel(const std::string& level);
     std::string getLogLevel();
+    void createObject(const std::string& type, const std::string& name,
+                      const qpid::types::Variant::Map& properties, bool lenient, const ConnectionState* context);
+    void deleteObject(const std::string& type, const std::string& name,
+                      const qpid::types::Variant::Map& options, const ConnectionState* context);
 
     boost::shared_ptr<sys::Poller> poller;
     sys::Timer timer;
@@ -179,7 +185,7 @@ public:
     bool clusterUpdatee;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConnectionCounter connectionCounter;
-    
+
   public:
     virtual ~Broker();
 
@@ -277,7 +283,7 @@ public:
     bool isClusterUpdatee() const { return clusterUpdatee; }
 
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
-    
+
     ConnectionCounter& getConnectionCounter() {return connectionCounter;}
 
     /**
@@ -290,6 +296,42 @@ public:
                           const boost::intrusive_ptr<Message>& msg)> deferDelivery;
 
     bool isAuthenticating ( ) { return config.auth; }
+
+    typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor;
+
+    std::pair<boost::shared_ptr<Queue>, bool> createQueue(
+        const std::string& name,
+        bool durable,
+        bool autodelete,
+        const OwnershipToken* owner,
+        const std::string& alternateExchange,
+        const qpid::framing::FieldTable& arguments,
+        const std::string& userId,
+        const std::string& connectionId);
+    void deleteQueue(const std::string& name,
+                     const std::string& userId,
+                     const std::string& connectionId,
+                     QueueFunctor check = QueueFunctor());
+    std::pair<Exchange::shared_ptr, bool> createExchange(
+        const std::string& name,
+        const std::string& type,
+        bool durable,
+        const std::string& alternateExchange,
+        const qpid::framing::FieldTable& args,
+        const std::string& userId, const std::string& connectionId);
+    void deleteExchange(const std::string& name, const std::string& userId,
+                        const std::string& connectionId);
+    void bind(const std::string& queue,
+              const std::string& exchange,
+              const std::string& key,
+              const qpid::framing::FieldTable& arguments,
+              const std::string& userId,
+              const std::string& connectionId);
+    void unbind(const std::string& queue,
+                const std::string& exchange,
+                const std::string& key,
+                const std::string& userId,
+                const std::string& connectionId);
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Feb 18 22:38:05 2011
@@ -444,7 +444,7 @@ void Queue::purgeExpired()
             Mutex::ScopedLock locker(messageLock);
             messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
         }
-        for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+        for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
     }
 }
 
@@ -826,8 +826,9 @@ void Queue::configure(const FieldTable& 
       store->create(*this, _settings);
 }
 
-void Queue::destroy()
+void Queue::destroyed()
 {
+    unbind(broker->getExchanges());
     if (alternateExchange.get()) {
         Mutex::ScopedLock locker(messageLock);
         while(!messages->empty()){
@@ -846,6 +847,7 @@ void Queue::destroy()
         store = 0;//ensure we make no more calls to the store for this queue
     }
     if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
+    notifyDeleted();
 }
 
 void Queue::notifyDeleted()
@@ -865,9 +867,9 @@ void Queue::bound(const string& exchange
     bindings.add(exchange, key, args);
 }
 
-void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref)
+void Queue::unbind(ExchangeRegistry& exchanges)
 {
-    bindings.unbind(exchanges, shared_ref);
+    bindings.unbind(exchanges, shared_from_this());
 }
 
 void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
@@ -955,8 +957,7 @@ void tryAutoDeleteImpl(Broker& broker, Q
     if (broker.getQueues().destroyIf(queue->getName(), 
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
         QPID_LOG(debug, "Auto-deleting " << queue->getName());
-        queue->unbind(broker.getExchanges(), queue);
-        queue->destroy();
+        queue->destroyed();
     }
 }
 
@@ -1175,6 +1176,20 @@ void Queue::flush()
     if (u.acquired && store) store->flush(*this);
 }
 
+bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
+                 const qpid::framing::FieldTable& arguments)
+{
+    if (exchange->bind(shared_from_this(), key, &arguments)) {
+        bound(exchange->getName(), key, arguments);
+        if (exchange->isDurable() && isDurable()) {
+            store->bind(*exchange, *this, key, arguments);
+        }
+        return true;
+    } else {
+        return false;
+    }
+}
+
 Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
 
 bool Queue::UsageBarrier::acquire()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Feb 18 22:38:05 2011
@@ -172,8 +172,9 @@ class Queue : public boost::enable_share
             }
         }
     }
-            
+
     void checkNotDeleted();
+    void notifyDeleted();
 
   public:
 
@@ -196,13 +197,17 @@ class Queue : public boost::enable_share
     // "recovering" means we are doing a MessageStore recovery.
     QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings,
                                       bool recovering = false);
-    void destroy();
-    void notifyDeleted();
+    void destroyed();
     QPID_BROKER_EXTERN void bound(const std::string& exchange,
                                   const std::string& key,
                                   const qpid::framing::FieldTable& args);
-    QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges,
-                                   Queue::shared_ptr shared_ref);
+    //TODO: get unbind out of the public interface; only there for purposes of one unit test
+    void unbind(ExchangeRegistry& exchanges);
+    /**
+     * Bind self to specified exchange, and record that binding for unbinding on delete.
+     */
+    bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
+              const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
 
     QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
     QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Fri Feb 18 22:38:05 2011
@@ -64,51 +64,54 @@ void SessionAdapter::ExchangeHandlerImpl
                                                   const string& alternateExchange, 
                                                   bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
 
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_TYPE, type));
-        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
-        params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
-        params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
-    }
-    
     //TODO: implement autoDelete
     Exchange::shared_ptr alternate;
     if (!alternateExchange.empty()) {
         alternate = getBroker().getExchanges().get(alternateExchange);
     }
     if(passive){
+        AclModule* acl = getBroker().getAcl();
+        if (acl) {
+            //TODO: why does a passive declare require create
+            //permission? The purpose of the passive flag is to state
+            //that the exchange should *not* created. For
+            //authorisation a passive declare is similar to
+            //exchange-query.
+            std::map<acl::Property, std::string> params;
+            params.insert(make_pair(acl::PROP_TYPE, type));
+            params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+            params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
+            params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+            if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
+                throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId()));
+        }
         Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
         checkType(actual, type);
         checkAlternate(actual, alternate);
-    }else{        
+    }else{
         if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) {
             throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")"));
         }
         try{
-            std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
-            if (response.second) {
-                if (alternate) {
-                    response.first->setAlternate(alternate);
-                    alternate->incAlternateUsers();
-                }
-                if (durable) {
-                    getBroker().getStore().create(*response.first, args);
-                }
-            } else {
+            std::pair<Exchange::shared_ptr, bool> response =
+                getBroker().createExchange(exchange, type, durable, alternateExchange, args,
+                                           getConnection().getUserId(), getConnection().getUrl());
+            if (!response.second) {
+                //exchange already there, not created
                 checkType(response.first, type);
                 checkAlternate(response.first, alternate);
+                ManagementAgent* agent = getBroker().getManagementAgent();
+                if (agent)
+                    agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
+                                                                 getConnection().getUserId(),
+                                                                 exchange,
+                                                                 type,
+                                                                 alternateExchange,
+                                                                 durable,
+                                                                 false,
+                                                                 ManagementAgent::toMap(args),
+                                                                 "existing"));
             }
-
-            ManagementAgent* agent = getBroker().getManagementAgent();
-            if (agent)
-                agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
-                                                             alternateExchange, durable, false, ManagementAgent::toMap(args),
-                                                             response.second ? "created" : "existing"));
-
         }catch(UnknownExchangeTypeException& /*e*/){
             throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
         }
@@ -134,22 +137,8 @@ void SessionAdapter::ExchangeHandlerImpl
                 
 void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
 {
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId()));
-    }
-
-    //TODO: implement unused
-    Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
-    if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
-    if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
-    if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
-    getBroker().getExchanges().destroy(name);
-
-    ManagementAgent* agent = getBroker().getManagementAgent();
-    if (agent)
-        agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name));
+    //TODO: implement if-unused
+    getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl());
 }
 
 ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
@@ -169,67 +158,19 @@ ExchangeQueryResult SessionAdapter::Exch
 }
 
 void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, 
-                                           const string& exchangeName, const string& routingKey, 
-                                           const FieldTable& arguments)
+                                               const string& exchangeName, const string& routingKey, 
+                                               const FieldTable& arguments)
 {
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
-        params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
-
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId()));
-    }
-
-    Queue::shared_ptr queue = getQueue(queueName);
-    Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
-    if(exchange){
-        string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
-        if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
-            queue->bound(exchangeName, routingKey, arguments);
-            if (exchange->isDurable() && queue->isDurable()) {
-                getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
-            }
-
-            ManagementAgent* agent = getBroker().getManagementAgent();
-            if (agent)
-                agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName,
-                                                  queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments)));
-        }
-    }else{
-        throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
-    }
+    getBroker().bind(queueName, exchangeName, routingKey, arguments,
+                     getConnection().getUserId(), getConnection().getUrl());
 }
  
 void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
                                                  const string& exchangeName,
                                                  const string& routingKey)
 {
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
-        params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId()));
-    }
-
-    Queue::shared_ptr queue = getQueue(queueName);
-    if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
-    Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
-    if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
-    //TODO: revise unbind to rely solely on binding key (not args)
-    if (exchange->unbind(queue, routingKey, 0)) {
-        if (exchange->isDurable() && queue->isDurable())
-            getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
-
-        ManagementAgent* agent = getBroker().getManagementAgent();
-        if (agent)
-            agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey));
-    }
+    getBroker().unbind(queueName, exchangeName, routingKey,
+                       getConnection().getUserId(), getConnection().getUrl());
 }
 
 ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -332,52 +273,42 @@ QueueQueryResult SessionAdapter::QueueHa
 void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
                                                bool passive, bool durable, bool exclusive, 
                                                bool autoDelete, const qpid::framing::FieldTable& arguments)
-{ 
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
-        params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
-        params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
-        params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
-        params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
-        params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
-        params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
-        params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
-
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
-    }
-
-    Exchange::shared_ptr alternate;
-    if (!alternateExchange.empty()) {
-        alternate = getBroker().getExchanges().get(alternateExchange);
-    }
+{
     Queue::shared_ptr queue;
     if (passive && !name.empty()) {
-    queue = getQueue(name);
+        AclModule* acl = getBroker().getAcl();
+        if (acl) {
+            //TODO: why does a passive declare require create
+            //permission? The purpose of the passive flag is to state
+            //that the queue should *not* created. For
+            //authorisation a passive declare is similar to
+            //queue-query (or indeed a qmf query).
+            std::map<acl::Property, std::string> params;
+            params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+            params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
+            params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
+            params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
+            params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
+            params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+            params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+            params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+            if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
+                throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
+        }
+        queue = getQueue(name);
         //TODO: check alternate-exchange is as expected
     } else {
-        std::pair<Queue::shared_ptr, bool> queue_created =  
-            getBroker().getQueues().declare(name, durable,
-                                            autoDelete,
-                                            exclusive ? &session : 0);
+        std::pair<Queue::shared_ptr, bool> queue_created =
+            getBroker().createQueue(name, durable,
+                                    autoDelete,
+                                    exclusive ? &session : 0,
+                                    alternateExchange,
+                                    arguments,
+                                    getConnection().getUserId(),
+                                    getConnection().getUrl());
         queue = queue_created.first;
         assert(queue);
         if (queue_created.second) { // This is a new queue
-            if (alternate) {
-                queue->setAlternateExchange(alternate);
-                alternate->incAlternateUsers();
-            }
-
-            //apply settings & create persistent record if required
-            try { queue_created.first->create(arguments); }
-            catch (...) { getBroker().getQueues().destroy(name); throw; }
-
-            //add default binding:
-            getBroker().getExchanges().getDefault()->bind(queue, name, 0);
-            queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
-
             //handle automatic cleanup:
             if (exclusive) {
                 exclusiveQueues.push_back(queue);
@@ -386,21 +317,20 @@ void SessionAdapter::QueueHandlerImpl::d
             if (exclusive && queue->setExclusiveOwner(&session)) {
                 exclusiveQueues.push_back(queue);
             }
+            ManagementAgent* agent = getBroker().getManagementAgent();
+            if (agent)
+                agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
+                                                      name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
+                                                      "existing"));
         }
 
-        ManagementAgent* agent = getBroker().getManagementAgent();
-        if (agent)
-            agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
-                                                      name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
-                                                      queue_created.second ? "created" : "existing"));
     }
 
-    if (exclusive && !queue->isExclusiveOwner(&session)) 
+    if (exclusive && !queue->isExclusiveOwner(&session))
         throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
                                                << queue->getName()));
-} 
-        
-        
+}
+
 void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
     AclModule* acl = getBroker().getAcl();
     if (acl)
@@ -409,40 +339,32 @@ void SessionAdapter::QueueHandlerImpl::p
              throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
     }
     getQueue(queue)->purge();
-} 
-        
-void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
-
-    AclModule* acl = getBroker().getAcl();
-    if (acl)
-    {
-         if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) )
-             throw UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId()));
-    }
+}
 
-    Queue::shared_ptr q = getQueue(queue);
-    if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) 
+void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty)
+{
+    if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) {
         throw ResourceLockedException(QPID_MSG("Cannot delete queue "
-                                               << queue << "; it is exclusive to another session"));
-    if(ifEmpty && q->getMessageCount() > 0){
-        throw PreconditionFailedException("Queue not empty.");
-    }else if(ifUnused && q->getConsumerCount() > 0){
-        throw PreconditionFailedException("Queue in use.");
-    }else{
+                                               << queue->getName() << "; it is exclusive to another session"));
+    } else if(ifEmpty && queue->getMessageCount() > 0) {
+        throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
+                                                   << queue->getName() << "; queue not empty"));
+    } else if(ifUnused && queue->getConsumerCount() > 0) {
+        throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
+                                                   << queue->getName() << "; queue in use"));
+    } else if (queue->isExclusiveOwner(&getConnection())) {
         //remove the queue from the list of exclusive queues if necessary
-        if(q->isExclusiveOwner(&getConnection())){
-            QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
-            if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
-        }
-        q->destroy();
-        getBroker().getQueues().destroy(queue);
-        q->unbind(getBroker().getExchanges(), q);
-
-        ManagementAgent* agent = getBroker().getManagementAgent();
-        if (agent)
-            agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
-        q->notifyDeleted();
-    }
+        QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(),
+                                            getConnection().exclusiveQueues.end(),
+                                            queue);
+        if (i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
+    }    
+}
+        
+void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty)
+{
+    getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(),
+                            boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty));
 } 
 
 SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Fri Feb 18 22:38:05 2011
@@ -138,6 +138,7 @@ class Queue;
         bool isLocal(const ConnectionToken* t) const; 
 
         void destroyExclusiveQueues();
+        void checkDelete(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
         template <class F> void eachExclusiveQueue(F f) 
         { 
             std::for_each(exclusiveQueues.begin(), exclusiveQueues.end(), f);

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Feb 18 22:38:05 2011
@@ -31,6 +31,7 @@
 #include <qpid/broker/Message.h>
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
 #include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/AclModule.h"
 #include "qpid/types/Variant.h"
@@ -2237,6 +2238,7 @@ void ManagementAgent::dispatchAgentComma
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
+    setManagementExecutionContext((const qpid::broker::ConnectionState*) msg.getPublisher());
     const framing::FieldTable *headers = msg.getApplicationHeaders();
     if (headers && msg.getAppId() == "qmf2")
     {
@@ -3085,3 +3087,21 @@ bool ManagementAgent::moveDeletedObjects
     }
     return !deleteList.empty();
 }
+
+namespace qpid {
+namespace management {
+
+namespace {
+QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
+}
+
+void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt)
+{
+    executionContext = ctxt;
+}
+const qpid::broker::ConnectionState* getManagementExecutionContext()
+{
+    return executionContext;
+}
+
+}}

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Fri Feb 18 22:38:05 2011
@@ -41,6 +41,9 @@
 #include <map>
 
 namespace qpid {
+namespace broker {
+class ConnectionState;
+}
 namespace management {
 
 class ManagementAgent
@@ -422,6 +425,8 @@ private:
     void debugSnapshot(const char* title);
 };
 
+void setManagementExecutionContext(const qpid::broker::ConnectionState*);
+const qpid::broker::ConnectionState* getManagementExecutionContext();
 }}
-            
+
 #endif  /*!_ManagementAgent_*/

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h Fri Feb 18 22:38:05 2011
@@ -27,15 +27,19 @@
 #include "qpid/client/Connection.h"
 #include "qpid/client/Session.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/messaging/Address.h"
 #include "qpid/messaging/Connection.h"
 #include "qpid/messaging/Session.h"
 #include "qpid/messaging/Sender.h"
 #include "qpid/messaging/Receiver.h"
 #include "qpid/messaging/Message.h"
+#include "qpid/types/Variant.h"
 
 namespace qpid {
 namespace tests {
 
+using qpid::types::Variant;
+
 struct BrokerAdmin
 {
     qpid::client::Connection connection;
@@ -223,6 +227,119 @@ inline void receive(messaging::Receiver&
     }
 }
 
+
+class MethodInvoker
+{
+  public:
+    MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
+                                      sender(session.createSender("qmf.default.direct/broker")),
+                                      receiver(session.createReceiver(replyTo)) {}
+
+    void createExchange(const std::string& name, const std::string& type, bool durable=false)
+    {
+        Variant::Map params;
+        params["name"]=name;
+        params["type"]="exchange";
+        params["properties"] = Variant::Map();
+        params["properties"].asMap()["exchange-type"] = type;
+        params["properties"].asMap()["durable"] = durable;
+        methodRequest("create", params);
+    }
+
+    void deleteExchange(const std::string& name)
+    {
+        Variant::Map params;
+        params["name"]=name;
+        params["type"]="exchange";
+        methodRequest("delete", params);
+    }
+
+    void createQueue(const std::string& name, bool durable=false, bool autodelete=false,
+                     const Variant::Map& options=Variant::Map())
+    {
+        Variant::Map params;
+        params["name"]=name;
+        params["type"]="queue";
+        params["properties"] = options;
+        params["properties"].asMap()["durable"] = durable;
+        params["properties"].asMap()["auto-delete"] = autodelete;
+        methodRequest("create", params);
+    }
+
+    void deleteQueue(const std::string& name)
+    {
+        Variant::Map params;
+        params["name"]=name;
+        params["type"]="queue";
+        methodRequest("delete", params);
+    }
+
+    void bind(const std::string& exchange, const std::string& queue, const std::string& key,
+                       const Variant::Map& options=Variant::Map())
+    {
+        Variant::Map params;
+        params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
+        params["type"]="binding";
+        params["properties"] = options;
+        methodRequest("create", params);
+    }
+
+    void unbind(const std::string& exchange, const std::string& queue, const std::string& key)
+    {
+        Variant::Map params;
+        params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
+        params["type"]="binding";
+        methodRequest("delete", params);
+    }
+
+    void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0)
+    {
+        Variant::Map content;
+        Variant::Map objectId;
+        objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
+        content["_object_id"] = objectId;
+        content["_method_name"] = method;
+        content["_arguments"] = inParams;
+
+        messaging::Message request;
+        request.setReplyTo(replyTo);
+        request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
+        request.getProperties()["qmf.opcode"] = "_method_request";
+        encode(content, request);
+
+        sender.send(request);
+
+        messaging::Message response;
+        if (receiver.fetch(response, messaging::Duration::SECOND*5)) {
+            if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
+                std::string opcode = response.getProperties()["qmf.opcode"];
+                if (opcode == "_method_response") {
+                    if (outParams) {
+                        Variant::Map m;
+                        decode(response, m);
+                        *outParams = m["_arguments"].asMap();
+                    }
+                } else if (opcode == "_exception") {
+                    Variant::Map m;
+                    decode(response, m);
+                    throw Exception(QPID_MSG("Error: " << m["_values"]));
+                } else {
+                    throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
+                }
+            } else {
+                throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
+                                         << response.getProperties()["x-amqp-0-10.app-id"]));
+            }
+        } else {
+            throw Exception(QPID_MSG("No response received"));
+        }
+    }
+  private:
+    messaging::Address replyTo;
+    messaging::Sender sender;
+    messaging::Receiver receiver;
+};
+
 }} // namespace qpid::tests
 
 #endif  /*!TESTS_MESSAGINGFIXTURE_H*/

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Fri Feb 18 22:38:05 2011
@@ -890,6 +890,53 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
     BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE));
 }
 
+QPID_AUTO_TEST_CASE(testQmfCreateAndDelete)
+{
+    MessagingFixture fix(Broker::Options(), true/*enable management*/);
+    MethodInvoker control(fix.session);
+    control.createQueue("my-queue");
+    control.createExchange("my-exchange", "topic");
+    control.bind("my-exchange", "my-queue", "subject1");
+
+    Sender sender = fix.session.createSender("my-exchange");
+    Receiver receiver = fix.session.createReceiver("my-queue");
+    Message out;
+    out.setSubject("subject1");
+    out.setContent("one");
+    sender.send(out);
+    Message in;
+    BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5));
+    BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+    control.unbind("my-exchange", "my-queue", "subject1");
+    control.bind("my-exchange", "my-queue", "subject2");
+
+    out.setContent("two");
+    sender.send(out);//should be dropped
+
+    out.setSubject("subject2");
+    out.setContent("three");
+    sender.send(out);//should not be dropped
+
+    BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5));
+    BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+    BOOST_CHECK(!receiver.fetch(in, Duration::IMMEDIATE));
+    sender.close();
+    receiver.close();
+
+    control.deleteExchange("my-exchange");
+    messaging::Session other = fix.connection.createSession();
+    {
+    ScopedSuppressLogging sl;
+    BOOST_CHECK_THROW(other.createSender("my-exchange"), qpid::messaging::NotFound);
+    }
+    control.deleteQueue("my-queue");
+    other = fix.connection.createSession();
+    {
+    ScopedSuppressLogging sl;
+    BOOST_CHECK_THROW(other.createReceiver("my-queue"), qpid::messaging::NotFound);
+    }
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Feb 18 22:38:05 2011
@@ -244,7 +244,7 @@ QPID_AUTO_TEST_CASE(testBound){
     exchange2.reset();
 
     //unbind the queue from all exchanges it knows it has been bound to:
-    queue->unbind(exchanges, queue);
+    queue->unbind(exchanges);
 
     //ensure the remaining exchanges don't still have the queue bound to them:
     FailOnDeliver deliverable;

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-ctrl
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-ctrl?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-ctrl (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-ctrl Fri Feb 18 22:38:05 2011
@@ -92,7 +92,10 @@ try:
   arguments = {}
   for a in args:
     name, val = nameval(a)
-    arguments[name] = val
+    if val[0] == '{' or val[0] == '[':
+      arguments[name] = eval(val)
+    else:
+      arguments[name] = val
   content = {
              "_object_id": {"_object_name": object_name},
              "_method_name": method_name,

Modified: qpid/trunk/qpid/cpp/src/tests/sender.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/sender.cpp?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/sender.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/sender.cpp Fri Feb 18 22:38:05 2011
@@ -120,7 +120,7 @@ void Sender::execute(AsyncSession& sessi
     string data;
     while (getline(std::cin, data)) {
         message.setData(data);
-        message.getHeaders().setInt("SN", ++sent);
+        //message.getHeaders().setInt("SN", ++sent);
         string matchKey;
         if (lvqMatchValues && getline(lvqMatchValues, matchKey)) {
             message.getHeaders().setString(QueueOptions::strLVQMatchProperty, matchKey);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Fri Feb 18 22:38:05 2011
@@ -712,6 +712,25 @@ public class QMFService implements Confi
             return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
         }
 
+        public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory,
+                                                                           final String type,
+                                                                           final String name,
+                                                                           final Map properties,
+                                                                           final java.lang.Boolean lenient)
+        {
+            //TODO:
+            return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+        }
+
+        public BrokerSchema.BrokerClass.DeleteMethodResponseCommand delete(final BrokerSchema.BrokerClass.DeleteMethodResponseCommandFactory factory,
+                                                                           final String type,
+                                                                           final String name,
+                                                                           final Map options)
+        {
+            //TODO:
+            return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+        }
+
         public UUID getId()
         {
             return _obj.getId();

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1072179&r1=1072178&r2=1072179&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Fri Feb 18 22:38:05 2011
@@ -102,6 +102,19 @@
       <arg name="level"     dir="O" type="sstr"/>
     </method>
 
+    <method name="create" desc="Create an object of the specified type">
+      <arg name="type" dir="I" type="sstr" desc="The type of object to create"/>
+      <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/> 
+      <arg name="properties" dir="I" type="map" desc="Type specific object properties"/> 
+      <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/> 
+    </method>
+
+    <method name="delete" desc="Delete an object of the specified type">
+      <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/>
+      <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/> 
+      <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/> 
+    </method>
+
   </class>
 
   <!--



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message