qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r601807 - in /incubator/qpid/trunk/qpid/cpp: managementgen/ managementgen/templates/ src/qpid/broker/ src/qpid/management/
Date Thu, 06 Dec 2007 18:37:20 GMT
Author: aconway
Date: Thu Dec  6 10:37:18 2007
New Revision: 601807

URL: http://svn.apache.org/viewvc?rev=601807&view=rev
Log:

>From Ted Ross <tross@redhat.com>

Queue statistics fixed. Additional objects added (exchange, binding).

Changes:

M cpp/src/qpid/broker/ExchangeRegistry.h
M cpp/src/qpid/broker/ExchangeRegistry.cpp

    ExchangeRegistry was modified to pass a parent pointer to created
    exchanges. This parent reference is not stored but is used to
    link management objects in a hierarchy of ownership.

M cpp/src/qpid/broker/Exchange.h
M cpp/src/qpid/broker/Exchange.cpp

    Exchange now inherits Manageable to make it visible via the
    management interface. The Exchange parent class handles most of
    the management boilerplate. A Binding struct was introduced to
    track bindings for management. This is separate from
    QueueBindings which track bindings for queues.

M cpp/src/qpid/broker/HeadersExchange.h
M cpp/src/qpid/broker/FanOutExchange.h
M cpp/src/qpid/broker/DirectExchange.h
M cpp/src/qpid/broker/TopicExchange.h
M cpp/src/qpid/broker/HeadersExchange.cpp
M cpp/src/qpid/broker/FanOutExchange.cpp
M cpp/src/qpid/broker/DirectExchange.cpp
M cpp/src/qpid/broker/TopicExchange.cpp
M cpp/src/qpid/management/ManagementExchange.cpp
M cpp/src/qpid/management/ManagementExchange.h

    Each exchange type handles management stats in its own specific
    way. Additionally, the constructors pass the management parent
    pointer to the constructor or Exchange.

    An extra layer was added to contain bindings. Instead of directly
    storing bound queues, the exchanges store "bindings" which are
    managable constructs.

M cpp/src/qpid/broker/Broker.cpp

    Broker now explicitly enables the management agent. Also sets the
    management parent (vhost) in the exchange registry.

M cpp/src/qpid/broker/Vhost.cpp

    Updated constructor to be more defensive in case the management
    agent has not been enabled.

M cpp/src/qpid/broker/Queue.cpp

    Same constructor update as vhost. Moved accounting of dequeues
    into "pop". Implemented management method handler (purge).

M cpp/src/qpid/broker/Deliverable.h

    A new method was added to extract the content size of the
    deliverable content (if appropriate). The method is not pure
    virtual and returns zero if not overridden.

M cpp/src/qpid/broker/DeliverableMessage.h
M cpp/src/qpid/broker/TxPublish.cpp
M cpp/src/qpid/broker/DeliverableMessage.cpp
M cpp/src/qpid/broker/TxPublish.h

    These derivatives of Deliverable were updated with overrides for
    contenSize.

M cpp/src/qpid/management/ManagementAgent.h
M cpp/src/qpid/management/ManagementAgent.cpp

    An "enable" method was added to prevent inadvertent creation of a
    management agent when not desired.

    Adding and deleting management objects is now protected by a
    mutex.

    Make sure that deleted objects get reported even if neither their
    configuration nor instrumentation is changed.

M specs/management-schema.xml

    Minor cosmetic updates. Additional parent linkage.

M cpp/managementgen/schema.py
M cpp/managementgen/templates/Class.cpp

    Added generated code to publish schema details for methods.

Modified:
    incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
    incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/schema.py?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/schema.py (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/schema.py Thu Dec  6 10:37:18 2007
@@ -397,6 +397,24 @@
   def getDir (self):
     return self.dir
 
+  def genSchema (self, stream):
+    stream.write ("    ft = FieldTable ();\n")
+    stream.write ("    ft.setString (NAME,    \"" + self.name + "\");\n")
+    stream.write ("    ft.setInt    (TYPE,    TYPE_" + self.type.type.base +");\n")
+    stream.write ("    ft.setString (DIR,     \"" + self.dir + "\");\n")
+    if self.unit != None:
+      stream.write ("    ft.setString (UNIT,    \"" + self.unit   + "\");\n")
+    if self.min != None:
+      stream.write ("    ft.setInt    (MIN,     " + self.min    + ");\n")
+    if self.max != None:
+      stream.write ("    ft.setInt    (MAX,     " + self.max    + ");\n")
+    if self.maxLen != None:
+      stream.write ("    ft.setInt    (MAXLEN,  " + self.maxLen + ");\n")
+    if self.desc != None:
+      stream.write ("    ft.setString (DESC,    \"" + self.desc + "\");\n")
+    if self.default != None:
+      stream.write ("    ft.setString (DEFAULT, \"" + self.default + "\");\n")
+    stream.write ("    buf.put (ft);\n\n")
 
 #=====================================================================================
 #
@@ -455,6 +473,16 @@
       dirTag = arg.dir.lower() + "_"
       stream.write ("    " + ctype + " " + dirTag + arg.getName () + ";\n")
 
+  def genSchema (self, stream):
+    stream.write ("    ft = FieldTable ();\n")
+    stream.write ("    ft.setString (NAME,     \"" + self.name + "\");\n")
+    stream.write ("    ft.setInt    (ARGCOUNT, " + str (len (self.args)) + ");\n")
+    if self.desc != None:
+      stream.write ("    ft.setString (DESC,     \"" + self.desc + "\");\n")
+    stream.write ("    buf.put (ft);\n\n")
+    for arg in self.args:
+      arg.genSchema (stream)
+
 #=====================================================================================
 #
 #=====================================================================================
@@ -550,15 +578,6 @@
     for inst in self.instElements:
       inst.genAccessor (stream)
 
-  def genArgDeclaration (self, stream):
-    argsFound = 0
-    for method in self.methods:
-       argsFound = argsFound + len (method.args)
-    for event in self.events:
-       argsFound = argsFound + len (event.args)
-    if argsFound > 0:
-      stream.write ("FieldTable arg;");
-
   def genConfigCount (self, stream):
     stream.write ("%d" % len (self.configElements))
 
@@ -683,7 +702,8 @@
       number = number + 1
 
   def genMethodSchema (self, stream):
-    pass ###########################################################################
+    for method in self.methods:
+      method.genSchema (stream)
 
   def genNameCap (self, stream):
     stream.write (self.name.capitalize ())

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp Thu Dec  6 10:37:18 2007
@@ -53,12 +53,15 @@
     const string MAX("max");
     const string MAXLEN("maxlen");
     const string DESC("desc");
+    const string ARGCOUNT("argCount");
+    const string ARGS("args");
+    const string DIR("dir");
+    const string DEFAULT("default");
 }
 
 void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
 {
     FieldTable ft;
-    /*MGEN:Class.ArgDeclaration*/
 
     schemaNeeded = false;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Dec  6 10:37:18 2007
@@ -130,6 +130,7 @@
     sessionManager(conf.ack)
 {
     if(conf.enableMgmt){
+        ManagementAgent::enableManagement ();
         managementAgent = ManagementAgent::getAgent ();
         managementAgent->setInterval (conf.mgmtPubInterval);
 
@@ -154,7 +155,8 @@
         Vhost* vhost = new Vhost (this);
         vhostObject = Vhost::shared_ptr (vhost);
 
-        queues.setParent (vhost);
+        queues.setParent    (vhost);
+        exchanges.setParent (vhost);
     }
 
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -284,7 +286,6 @@
 
     case management::Broker::METHOD_JOINCLUSTER :
     case management::Broker::METHOD_LEAVECLUSTER :
-    case management::Broker::METHOD_CRASH :
         status = Manageable::STATUS_NOT_IMPLEMENTED;
         break;
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h Thu Dec  6 10:37:18 2007
@@ -30,6 +30,7 @@
             bool delivered;
             Deliverable() : delivered(false) {}
             virtual void deliverTo(Queue::shared_ptr& queue) = 0;
+            virtual uint64_t contentSize() { return 0; }
             virtual ~Deliverable(){}
         };
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp Thu Dec  6 10:37:18 2007
@@ -37,3 +37,7 @@
     return *msg;
 }
 
+uint64_t DeliverableMessage::contentSize ()
+{
+    return msg->contentSize ();
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h Thu Dec  6 10:37:18 2007
@@ -33,6 +33,7 @@
             DeliverableMessage(intrusive_ptr<Message>& msg);
             virtual void deliverTo(Queue::shared_ptr& queue);
             Message& getMessage();
+            uint64_t contentSize();
             virtual ~DeliverableMessage(){}
         };
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Dec  6 10:37:18 2007
@@ -25,16 +25,37 @@
 using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace qpid::sys;
+using qpid::management::Manageable;
 
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {}
-DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+    if (mgmtExchange.get() != 0)
+        mgmtExchange->set_type (typeName);
+}
+
+DirectExchange::DirectExchange(const std::string& _name, bool _durable,
+                               const FieldTable& _args, Manageable* _parent) :
+    Exchange(_name, _durable, _args, _parent)
+{
+    if (mgmtExchange.get() != 0)
+        mgmtExchange->set_type (typeName);
+}
 
 bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
     RWlock::ScopedWlock l(lock);
-    std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
-    std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+    std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+    std::vector<Binding::shared_ptr>::iterator i;
+
+    for (i = queues.begin(); i != queues.end(); i++)
+        if ((*i)->queue == queue)
+            break;
+
     if (i == queues.end()) {
-        bindings[routingKey].push_back(queue);
+        Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+        bindings[routingKey].push_back(binding);
+        if (mgmtExchange.get() != 0) {
+            mgmtExchange->inc_bindings ();
+        }
         return true;
     } else{
         return false;
@@ -43,14 +64,21 @@
 
 bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
     RWlock::ScopedWlock l(lock);
-    std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+    std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+    std::vector<Binding::shared_ptr>::iterator i;
+
+    for (i = queues.begin(); i != queues.end(); i++)
+        if ((*i)->queue == queue)
+            break;
 
-    std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
     if (i < queues.end()) {
         queues.erase(i);
-        if(queues.empty()){
+        if (queues.empty()) {
             bindings.erase(routingKey);
         }
+        if (mgmtExchange.get() != 0) {
+            mgmtExchange->dec_bindings ();
+        }
         return true;
     } else {
         return false;
@@ -59,38 +87,65 @@
 
 void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
     RWlock::ScopedRlock l(lock);
-    std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+    std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+    std::vector<Binding::shared_ptr>::iterator i;
     int count(0);
-    for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
-        msg.deliverTo(*i);
-    }
+
+    for(i = queues.begin(); i != queues.end(); i++, count++) {
+        msg.deliverTo((*i)->queue);
+        if ((*i)->mgmtBinding.get() != 0)
+            (*i)->mgmtBinding->inc_msgMatched ();
+     }
+
     if(!count){
         QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
+        if (mgmtExchange.get() != 0) {
+            mgmtExchange->inc_msgDrops  ();
+            mgmtExchange->inc_byteDrops (msg.contentSize ());
+        }
+    }
+    else {
+        if (mgmtExchange.get() != 0) {
+            mgmtExchange->inc_msgRoutes  (count);
+            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+        }
+    }
+
+    if (mgmtExchange.get() != 0) {
+        mgmtExchange->inc_msgReceives  ();
+        mgmtExchange->inc_byteReceives (msg.contentSize ());
     }
 }
 
 
 bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
 {
+    std::vector<Binding::shared_ptr>::iterator j;
+
     if (routingKey) {
         Bindings::iterator i = bindings.find(*routingKey);
-        return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end());
+
+        if (i == bindings.end())
+            return false;
+        if (!queue)
+            return true;
+        for (j = i->second.begin(); j != i->second.end(); j++)
+            if ((*j)->queue == queue)
+                return true;
     } else if (!queue) {
         //if no queue or routing key is specified, just report whether any bindings exist
         return bindings.size() > 0;
     } else {
-        for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
-            if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) {
-                return true;
-            }
-        }
+        for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
+            for (j = i->second.begin(); j != i->second.end(); j++)
+                if ((*j)->queue == queue)
+                    return true;
         return false;
     }
-}
-
-DirectExchange::~DirectExchange(){
 
+    return false;
 }
 
+DirectExchange::~DirectExchange() {}
 
 const std::string DirectExchange::typeName("direct");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Thu Dec  6 10:37:18 2007
@@ -31,17 +31,17 @@
 namespace qpid {
 namespace broker {
     class DirectExchange : public virtual Exchange{
-        typedef std::vector<Queue::shared_ptr> Queues;
-        typedef std::map<string, Queues > Bindings;
+        typedef std::vector<Binding::shared_ptr> Queues;
+        typedef std::map<string, Queues> Bindings;
         Bindings bindings;
         qpid::sys::RWlock lock;
 
     public:
         static const std::string typeName;
         
-        DirectExchange(const std::string& name);
+        DirectExchange(const std::string& name, management::Manageable* parent = 0);
         DirectExchange(const string& _name, bool _durable, 
-                       const qpid::framing::FieldTable& _args);
+                       const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
 
         virtual std::string getType() const { return typeName; }            
         

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Dec  6 10:37:18 2007
@@ -21,10 +21,52 @@
 
 #include "Exchange.h"
 #include "ExchangeRegistry.h"
+#include "qpid/management/ManagementAgent.h"
 
 using namespace qpid::broker;
 using qpid::framing::Buffer;
 using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+
+Exchange::Exchange (const string& _name, Manageable* parent) :
+    name(_name), durable(false), persistenceId(0)
+{
+    if (parent != 0)
+    {
+        ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+        if (agent.get () != 0)
+        {
+            mgmtExchange = management::Exchange::shared_ptr
+                (new management::Exchange (this, parent, _name));
+            agent->addObject (mgmtExchange);
+        }
+    }
+}
+
+Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
+                   Manageable* parent)
+    : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0)
+{
+    if (parent != 0)
+    {
+        ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+        if (agent.get () != 0)
+        {
+            mgmtExchange = management::Exchange::shared_ptr
+                (new management::Exchange (this, parent, _name));
+            agent->addObject (mgmtExchange);
+        }
+    }
+}
+
+Exchange::~Exchange ()
+{
+    if (mgmtExchange.get () != 0)
+        mgmtExchange->resourceDestroy ();
+}
 
 Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer)
 {
@@ -56,5 +98,43 @@
         + args.size(); 
 }
 
+ManagementObject::shared_ptr Exchange::GetManagementObject (void) const
+{
+    return dynamic_pointer_cast<ManagementObject> (mgmtExchange);
+}
 
+Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent)
+    : queue(_queue), key(_key)
+{
+    if (parent != 0)
+    {
+        ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+        if (agent.get() != 0)
+        {
+            ManagementObject::shared_ptr mo = queue->GetManagementObject();
+            if (mo.get() != 0)
+            {
+                uint64_t queueId = mo->getObjectId();
+                mgmtBinding = management::Binding::shared_ptr
+                    (new management::Binding (this, (Manageable*) parent, queueId, key));
+                agent->addObject (mgmtBinding);
+            }
+        }
+    }
+}
+
+Exchange::Binding::~Binding ()
+{
+    if (mgmtBinding.get () != 0)
+        mgmtBinding->resourceDestroy ();
+}
 
+ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const
+{
+    return dynamic_pointer_cast<ManagementObject> (mgmtBinding);
+}
+
+Manageable::status_t Exchange::Binding::ManagementMethod (uint32_t, Args&)
+{
+    return Manageable::STATUS_UNKNOWN_METHOD;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Dec  6 10:37:18 2007
@@ -28,13 +28,16 @@
 #include "MessageStore.h"
 #include "PersistableExchange.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Exchange.h"
+#include "qpid/management/Binding.h"
 
 namespace qpid {
     namespace broker {
         using std::string;
         class ExchangeRegistry;
 
-        class Exchange : public PersistableExchange{
+        class Exchange : public PersistableExchange, public management::Manageable {
         private:
             const string name;
             const bool durable;
@@ -43,13 +46,31 @@
             uint32_t alternateUsers;
             mutable uint64_t persistenceId;
 
+        protected:
+            struct Binding : public management::Manageable {
+                typedef boost::shared_ptr<Binding>       shared_ptr;
+                typedef std::vector<Binding::shared_ptr> vector;
+
+                Queue::shared_ptr               queue;
+                const std::string               key;
+                const qpid::framing::FieldTable args;
+                management::Binding::shared_ptr mgmtBinding;
+
+                Binding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent = 0);
+                ~Binding ();
+                management::ManagementObject::shared_ptr GetManagementObject () const;
+                management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args);
+            };
+
+            management::Exchange::shared_ptr mgmtExchange;
+
         public:
             typedef boost::shared_ptr<Exchange> shared_ptr;
 
-            explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){}
-            Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args) 
-                : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){}
-            virtual ~Exchange(){}
+            explicit Exchange(const string& name, management::Manageable* parent = 0);
+            Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
+                     management::Manageable* parent = 0);
+            virtual ~Exchange();
 
             const string& getName() const { return name; }
             bool isDurable() { return durable; }
@@ -75,6 +96,10 @@
 
             static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
 
+            // Manageable entry points
+            management::ManagementObject::shared_ptr GetManagementObject (void) const;
+            management::Manageable::status_t
+                ManagementMethod (uint32_t, management::Args&) { return management::Manageable::STATUS_UNKNOWN_METHOD; }
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu Dec  6 10:37:18 2007
@@ -46,15 +46,15 @@
         Exchange::shared_ptr exchange;
 
         if(type == TopicExchange::typeName){
-            exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args));
+            exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent));
         }else if(type == DirectExchange::typeName){
-            exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args));
+            exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent));
         }else if(type == FanOutExchange::typeName){
-            exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args));
+            exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent));
         }else if (type == HeadersExchange::typeName) {
-            exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args));
+            exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent));
         }else if (type == ManagementExchange::typeName) {
-            exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args));
+            exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent));
         }else{
             throw UnknownExchangeTypeException();    
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Thu Dec  6 10:37:18 2007
@@ -27,6 +27,7 @@
 #include "MessageStore.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/sys/Monitor.h"
+#include "qpid/management/Manageable.h"
 
 namespace qpid {
 namespace broker {
@@ -36,7 +37,9 @@
         typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
         ExchangeMap exchanges;
         qpid::sys::RWlock lock;
+        management::Manageable* parent;
      public:
+        ExchangeRegistry () : parent(0) {}
         std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
             throw(UnknownExchangeTypeException);
         std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type, 
@@ -45,6 +48,11 @@
         void destroy(const std::string& name);
         Exchange::shared_ptr get(const std::string& name);
         Exchange::shared_ptr getDefault();
+
+        /**
+         * Register the manageable parent for declared queues
+         */
+        void setParent (management::Manageable* _parent) { parent = _parent; }
     };
 }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu Dec  6 10:37:18 2007
@@ -25,15 +25,36 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
-FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) :
+    Exchange(_name, _parent)
+{
+    if (mgmtExchange.get() != 0)
+        mgmtExchange->set_type (typeName);
+}
+
+FanOutExchange::FanOutExchange(const std::string& _name, bool _durable,
+                               const FieldTable& _args, Manageable* _parent) :
+    Exchange(_name, _durable, _args, _parent)
+{
+    if (mgmtExchange.get() != 0)
+        mgmtExchange->set_type (typeName);
+}
 
 bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
     RWlock::ScopedWlock locker(lock);
+    std::vector<Binding::shared_ptr>::iterator i;
+
     // Add if not already present.
-    Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+    for (i = bindings.begin (); i != bindings.end(); i++)
+        if ((*i)->queue == queue)
+            break;
+
     if (i == bindings.end()) {
-        bindings.push_back(queue);
+        Binding::shared_ptr binding (new Binding ("", queue, this));
+        bindings.push_back(binding);
+        if (mgmtExchange.get() != 0) {
+            mgmtExchange->inc_bindings ();
+        }
         return true;
     } else {
         return false;
@@ -42,9 +63,17 @@
 
 bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
     RWlock::ScopedWlock locker(lock);
-    Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+    std::vector<Binding::shared_ptr>::iterator i;
+
+    for (i = bindings.begin (); i != bindings.end(); i++)
+        if ((*i)->queue == queue)
+            break;
+
     if (i != bindings.end()) {
         bindings.erase(i);
+        if (mgmtExchange.get() != 0) {
+            mgmtExchange->dec_bindings ();
+        }
         return true;
     } else {
         return false;
@@ -53,14 +82,40 @@
 
 void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
     RWlock::ScopedRlock locker(lock);
-    for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
-        msg.deliverTo(*i);
+    uint32_t count(0);
+
+    for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){
+        msg.deliverTo((*i)->queue);
+        if ((*i)->mgmtBinding.get() != 0)
+            (*i)->mgmtBinding->inc_msgMatched ();
+    }
+
+    if (mgmtExchange.get() != 0)
+    {
+        mgmtExchange->inc_msgReceives  ();
+        mgmtExchange->inc_byteReceives (msg.contentSize ());
+        if (count == 0)
+        {
+            mgmtExchange->inc_msgDrops  ();
+            mgmtExchange->inc_byteDrops (msg.contentSize ());
+        }
+        else
+        {
+            mgmtExchange->inc_msgRoutes  (count);
+            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+        }
     }
 }
 
 bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
 {
-    return std::find(bindings.begin(), bindings.end(), queue) != bindings.end();
+    std::vector<Binding::shared_ptr>::iterator i;
+
+    for (i = bindings.begin (); i != bindings.end(); i++)
+        if ((*i)->queue == queue)
+            break;
+
+    return i != bindings.end();
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Thu Dec  6 10:37:18 2007
@@ -32,15 +32,16 @@
 namespace broker {
 
 class FanOutExchange : public virtual Exchange {
-    std::vector<Queue::shared_ptr> bindings;
+    std::vector<Binding::shared_ptr> bindings;
     qpid::sys::RWlock lock;
 
   public:
     static const std::string typeName;
         
-    FanOutExchange(const std::string& name);
+    FanOutExchange(const std::string& name, management::Manageable* parent = 0);
     FanOutExchange(const string& _name, bool _durable, 
-                   const qpid::framing::FieldTable& _args);
+                   const qpid::framing::FieldTable& _args,
+                   management::Manageable* parent = 0);
 
     virtual std::string getType() const { return typeName; }            
         

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Thu Dec  6 10:37:18 2007
@@ -40,19 +40,40 @@
     const std::string x_match("x-match");
 }
 
-HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
-HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) :
+    Exchange(_name, _parent)
+{
+    if (mgmtExchange.get() != 0)
+        mgmtExchange->set_type (typeName);
+}
+
+HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
+                                 const FieldTable& _args, Manageable* _parent) :
+    Exchange(_name, _durable, _args, _parent)
+{
+    if (mgmtExchange.get() != 0)
+        mgmtExchange->set_type (typeName);
+}
 
 bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
     RWlock::ScopedWlock locker(lock);
     FieldTable::ValuePtr what = args->get(x_match);
     if (!what || (*what != all && *what != any)) 
         throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
-    Binding binding(*args, queue);
-    Bindings::iterator i =
-        std::find(bindings.begin(),bindings.end(), binding);
+    Bindings::iterator i;
+
+    for (i = bindings.begin(); i != bindings.end(); i++)
+        if (i->first == *args && i->second->queue == queue)
+            break;
+
     if (i == bindings.end()) {
-        bindings.push_back(binding);
+        Binding::shared_ptr binding (new Binding ("", queue, this));
+        HeaderMap headerMap(*args, binding);
+
+        bindings.push_back(headerMap);
+        if (mgmtExchange.get() != 0) {
+            mgmtExchange->inc_bindings ();
+        }
         return true;
     } else {
         return false;
@@ -61,10 +82,16 @@
 
 bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
     RWlock::ScopedWlock locker(lock);
-    Bindings::iterator i =
-        std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
+    Bindings::iterator i;
+    for (i = bindings.begin(); i != bindings.end(); i++)
+        if (i->first == *args && i->second->queue == queue)
+            break;
+
     if (i != bindings.end()) {
         bindings.erase(i);
+        if (mgmtExchange.get() != 0) {
+            mgmtExchange->dec_bindings ();
+        }
         return true;
     } else {
         return false;
@@ -73,9 +100,29 @@
 
 
 void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
-    RWlock::ScopedRlock locker(lock);;
-    for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
-        if (match(i->first, *args)) msg.deliverTo(i->second);
+    RWlock::ScopedRlock locker(lock);
+    uint32_t count(0);
+
+    for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) {
+        if (match(i->first, *args)) msg.deliverTo(i->second->queue);
+        if (i->second->mgmtBinding.get() != 0)
+            i->second->mgmtBinding->inc_msgMatched ();
+    }
+
+    if (mgmtExchange.get() != 0)
+    {
+        mgmtExchange->inc_msgReceives  ();
+        mgmtExchange->inc_byteReceives (msg.contentSize ());
+        if (count == 0)
+        {
+            mgmtExchange->inc_msgDrops  ();
+            mgmtExchange->inc_byteDrops (msg.contentSize ());
+        }
+        else
+        {
+            mgmtExchange->inc_msgRoutes  (count);
+            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+        }
     }
 }
 
@@ -83,7 +130,7 @@
 bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
 {
     for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
-        if ( (!args || equal(i->first, *args)) && (!queue || i->second == queue)) {
+        if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) {
             return true;
         }
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Thu Dec  6 10:37:18 2007
@@ -32,8 +32,8 @@
 
 
 class HeadersExchange : public virtual Exchange {    
-    typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding;
-    typedef std::vector<Binding> Bindings;
+    typedef std::pair<qpid::framing::FieldTable, Binding::shared_ptr> HeaderMap;
+    typedef std::vector<HeaderMap> Bindings;
 
     Bindings bindings;
     qpid::sys::RWlock lock;
@@ -41,9 +41,10 @@
   public:
     static const std::string typeName;
 
-    HeadersExchange(const string& name);
+    HeadersExchange(const string& name, management::Manageable* parent = 0);
     HeadersExchange(const string& _name, bool _durable, 
-                    const qpid::framing::FieldTable& _args);
+                    const qpid::framing::FieldTable& _args,
+                    management::Manageable* parent = 0);
     
     virtual std::string getType() const { return typeName; }            
         

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Dec  6 10:37:18 2007
@@ -59,11 +59,14 @@
 {
     if (parent != 0)
     {
-        mgmtObject = management::Queue::shared_ptr
-            (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
-
         ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
-        agent->addObject (mgmtObject);
+
+        if (agent.get () != 0)
+        {
+            mgmtObject = management::Queue::shared_ptr
+                (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
+            agent->addObject (mgmtObject);
+        }
     }
 }
 
@@ -93,14 +96,14 @@
         if (!enqueue(0, msg)){
             push(msg);
             msg->enqueueComplete();
-            if (mgmtObject != 0) {
+            if (mgmtObject.get() != 0) {
                 mgmtObject->inc_msgTotalEnqueues ();
                 mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
                 mgmtObject->inc_msgDepth ();
                 mgmtObject->inc_byteDepth (msg->contentSize ());
             }
         }else {
-            if (mgmtObject != 0) {
+            if (mgmtObject.get() != 0) {
                 mgmtObject->inc_msgTotalEnqueues ();
                 mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
                 mgmtObject->inc_msgDepth ();
@@ -118,7 +121,7 @@
 void Queue::recover(intrusive_ptr<Message>& msg){
     push(msg);
     msg->enqueueComplete(); // mark the message as enqueued
-    if (mgmtObject != 0) {
+    if (mgmtObject.get() != 0) {
         mgmtObject->inc_msgTotalEnqueues ();
         mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
         mgmtObject->inc_msgPersistEnqueues ();
@@ -136,7 +139,7 @@
 
 void Queue::process(intrusive_ptr<Message>& msg){
     push(msg);
-    if (mgmtObject != 0) {
+    if (mgmtObject.get() != 0) {
         mgmtObject->inc_msgTotalEnqueues ();
         mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
         mgmtObject->inc_msgTxnEnqueues ();
@@ -319,7 +322,7 @@
     }
     consumerCount++;
 
-    if (mgmtObject != 0){
+    if (mgmtObject.get() != 0){
         mgmtObject->inc_consumers ();
     }
 }
@@ -329,7 +332,7 @@
     Mutex::ScopedLock locker(consumerLock);
     consumerCount--;
     if(exclusive) exclusive = false;
-    if (mgmtObject != 0){
+    if (mgmtObject.get() != 0){
         mgmtObject->dec_consumers ();
     }
 }
@@ -341,16 +344,6 @@
     if(!messages.empty()){
         msg = messages.front();
         pop();
-        if (mgmtObject != 0){
-            mgmtObject->inc_msgTotalDequeues ();
-            //mgmtObject->inc_byteTotalDequeues (msg->contentSize ());
-            mgmtObject->dec_msgDepth ();
-            //mgmtObject->dec_byteDepth (msg->contentSize ());
-            if (0){//msg->isPersistent ()) {
-                mgmtObject->inc_msgPersistDequeues ();
-                //mgmtObject->inc_bytePersistDequeues (msg->contentSize ());
-            }
-        }
     }
     return msg;
 }
@@ -366,7 +359,19 @@
  * Assumes messageLock is held
  */
 void Queue::pop(){
-    if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
+    QueuedMessage& msg = messages.front();
+
+    if (policy.get()) policy->dequeued(msg.payload->contentSize());
+    if (mgmtObject.get() != 0){
+        mgmtObject->inc_msgTotalDequeues  ();
+        mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
+        mgmtObject->dec_msgDepth ();
+        mgmtObject->dec_byteDepth (msg.payload->contentSize());
+        if (msg.payload->isPersistent ()){
+            mgmtObject->inc_msgPersistDequeues ();
+            mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
+        }
+    }
     messages.pop_front();
 }
 
@@ -473,7 +478,8 @@
     }
 }
 
-void Queue::bound(const string& exchange, const string& key, const FieldTable& args)
+void Queue::bound(const string& exchange, const string& key,
+                  const FieldTable& args)
 {
     bindings.add(exchange, key, args);
 }
@@ -584,8 +590,24 @@
     return dynamic_pointer_cast<ManagementObject> (mgmtObject);
 }
 
-Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/,
+Manageable::status_t Queue::ManagementMethod (uint32_t methodId,
                                               Args&    /*args*/)
 {
-    return Manageable::STATUS_OK;
+    Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+    QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+    switch (methodId)
+    {
+    case management::Queue::METHOD_PURGE :
+        purge ();
+        status = Manageable::STATUS_OK;
+        break;
+
+    case management::Queue::METHOD_INCREASEJOURNALSIZE :
+        status = Manageable::STATUS_NOT_IMPLEMENTED;
+        break;
+    }
+
+    return status;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Dec  6 10:37:18 2007
@@ -115,9 +115,19 @@
     return do_match(begin(), end(), target.begin(), target.end());
 }
 
-TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
-TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+    if (mgmtExchange.get() != 0)
+        mgmtExchange->set_type (typeName);
+}
 
+TopicExchange::TopicExchange(const std::string& _name, bool _durable,
+                             const FieldTable& _args, Manageable* _parent) :
+    Exchange(_name, _durable, _args, _parent)
+{
+    if (mgmtExchange.get() != 0)
+        mgmtExchange->set_type (typeName);
+}
 
 bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
     RWlock::ScopedWlock l(lock);
@@ -125,7 +135,11 @@
     if (isBound(queue, routingPattern)) {
         return false;
     } else {
-        bindings[routingPattern].push_back(queue);
+        Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+        bindings[routingPattern].push_back(binding);
+        if (mgmtExchange.get() != 0) {
+            mgmtExchange->inc_bindings ();
+        }
         return true;
     }
 }
@@ -133,12 +147,19 @@
 bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
     RWlock::ScopedWlock l(lock);
     BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
-    Queue::vector& qv(bi->second);
+    Binding::vector& qv(bi->second);
     if (bi == bindings.end()) return false;
-    Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+
+    Binding::vector::iterator q;
+    for (q = qv.begin(); q != qv.end(); q++)
+        if ((*q)->queue == queue)
+            break;
     if(q == qv.end()) return false;
     qv.erase(q);
     if(qv.empty()) bindings.erase(bi);
+    if (mgmtExchange.get() != 0) {
+        mgmtExchange->dec_bindings ();
+    }
     return true;
 }
 
@@ -146,21 +167,45 @@
 {
     BindingMap::iterator bi = bindings.find(pattern);
     if (bi == bindings.end()) return false;
-    Queue::vector& qv(bi->second);
-    return find(qv.begin(), qv.end(), queue) != qv.end();
+    Binding::vector& qv(bi->second);
+    Binding::vector::iterator q;
+    for (q = qv.begin(); q != qv.end(); q++)
+        if ((*q)->queue == queue)
+            break;
+    return q != qv.end();
 }
 
 void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
     RWlock::ScopedRlock l(lock);
+    uint32_t count(0);
     Tokens   tokens(routingKey);
+
     for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
         if (i->first.match(tokens)) {
-            Queue::vector& qv(i->second);
-            for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
-                msg.deliverTo(*j);
+            Binding::vector& qv(i->second);
+            for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
+                msg.deliverTo((*j)->queue);
+                if ((*j)->mgmtBinding.get() != 0)
+                    (*j)->mgmtBinding->inc_msgMatched ();
             }
         }
     }
+
+    if (mgmtExchange.get() != 0)
+    {
+        mgmtExchange->inc_msgReceives  ();
+        mgmtExchange->inc_byteReceives (msg.contentSize ());
+        if (count == 0)
+        {
+            mgmtExchange->inc_msgDrops  ();
+            mgmtExchange->inc_byteDrops (msg.contentSize ());
+        }
+        else
+        {
+            mgmtExchange->inc_msgRoutes  (count);
+            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+        }
+    }
 }
 
 bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
@@ -176,16 +221,16 @@
                 return true;
             }
         }            
-        return false;
     } else {
         for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
-            Queue::vector& qv(i->second);
-            if (find(qv.begin(), qv.end(), queue) != qv.end()) {
-                return true;
-            }
+            Binding::vector& qv(i->second);
+            Binding::vector::iterator q;
+            for (q = qv.begin(); q != qv.end(); q++)
+                if ((*q)->queue == queue)
+                    return true;
         }
-        return false;
     }
+    return false;
 }
 
 TopicExchange::~TopicExchange() {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Thu Dec  6 10:37:18 2007
@@ -71,7 +71,7 @@
 };
 
 class TopicExchange : public virtual Exchange{
-    typedef std::map<TopicPattern, Queue::vector> BindingMap;
+    typedef std::map<TopicPattern, Binding::vector> BindingMap;
     BindingMap bindings;
     qpid::sys::RWlock lock;
 
@@ -79,9 +79,9 @@
   public:
     static const std::string typeName;
 
-    TopicExchange(const string& name);
+    TopicExchange(const string& name, management::Manageable* parent = 0);
     TopicExchange(const string& _name, bool _durable, 
-                  const qpid::framing::FieldTable& _args);
+                  const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
 
     virtual std::string getType() const { return typeName; }            
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Thu Dec  6 10:37:18 2007
@@ -67,3 +67,7 @@
     queue->process(msg);
 }
 
+uint64_t TxPublish::contentSize ()
+{
+    return msg->contentSize ();
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Thu Dec  6 10:37:18 2007
@@ -66,10 +66,12 @@
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();
-
+            
             virtual void deliverTo(Queue::shared_ptr& queue);
 
             virtual ~TxPublish(){}
+
+            uint64_t contentSize();
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp Thu Dec  6 10:37:18 2007
@@ -27,11 +27,14 @@
 {
     if (parentBroker != 0)
     {
-        mgmtObject = management::Vhost::shared_ptr
-            (new management::Vhost (this, parentBroker, "/"));
-
         ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
-        agent->addObject (mgmtObject);
+
+        if (agent.get () != 0)
+        {
+            mgmtObject = management::Vhost::shared_ptr
+                (new management::Vhost (this, parentBroker, "/"));
+            agent->addObject (mgmtObject);
+        }
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu Dec  6 10:37:18 2007
@@ -33,6 +33,7 @@
 using namespace qpid::sys;
 
 ManagementAgent::shared_ptr ManagementAgent::agent;
+bool                        ManagementAgent::enabled = 0;
 
 ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
 {
@@ -40,16 +41,21 @@
     nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
 }
 
+void ManagementAgent::enableManagement (void)
+{
+    enabled = 1;
+}
+
 ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
 {
-    if (agent.get () == 0)
+    if (enabled && agent.get () == 0)
         agent = shared_ptr (new ManagementAgent (10));
 
     return agent;
 }
 
-void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange,
-                                   Exchange::shared_ptr _dexchange)
+void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
+                                   broker::Exchange::shared_ptr _dexchange)
 {
     mExchange = _mexchange;
     dExchange = _dexchange;
@@ -57,6 +63,7 @@
 
 void ManagementAgent::addObject (ManagementObject::shared_ptr object)
 {
+    RWlock::ScopedWlock writeLock (userLock);
     uint64_t objectId = nextObjectId++;
 
     object->setObjectId (objectId);
@@ -74,6 +81,8 @@
 
 void ManagementAgent::clientAdded (void)
 {
+    RWlock::ScopedRlock readLock (userLock);
+
     for (ManagementObjectMap::iterator iter = managementObjects.begin ();
          iter != managementObjects.end ();
          iter++)
@@ -94,7 +103,7 @@
 
 void ManagementAgent::SendBuffer (Buffer&  buf,
                                   uint32_t length,
-                                  Exchange::shared_ptr exchange,
+                                  broker::Exchange::shared_ptr exchange,
                                   string   routingKey)
 {
     intrusive_ptr<Message> msg (new Message ());
@@ -129,9 +138,10 @@
 {
 #define BUFSIZE   65536
 #define THRESHOLD 16384
-    char      msgChars[BUFSIZE];
-    uint32_t  contentSize;
-    string    routingKey;
+    RWlock::ScopedWlock writeLock (userLock);
+    char                msgChars[BUFSIZE];
+    uint32_t            contentSize;
+    string              routingKey;
     std::list<uint64_t> deleteList;
 
     if (managementObjects.empty ())
@@ -157,7 +167,7 @@
             SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         }
 
-        if (object->getConfigChanged ())
+        if (object->getConfigChanged () || object->isDeleted ())
         {
             Buffer msgBuffer (msgChars, BUFSIZE);
             EncodeHeader (msgBuffer);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Thu Dec  6 10:37:18 2007
@@ -25,6 +25,7 @@
 #include "qpid/Options.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/Timer.h"
+#include "qpid/sys/Mutex.h"
 #include "ManagementObject.h"
 #include <boost/shared_ptr.hpp>
 
@@ -41,11 +42,12 @@
 
     typedef boost::shared_ptr<ManagementAgent> shared_ptr;
 
+    static void       enableManagement (void);
     static shared_ptr getAgent (void);
 
     void setInterval     (uint16_t _interval) { interval = _interval; }
-    void setExchange     (broker::Exchange::shared_ptr  mgmtExchange,
-                          broker::Exchange::shared_ptr  directExchange);
+    void setExchange     (broker::Exchange::shared_ptr mgmtExchange,
+                          broker::Exchange::shared_ptr directExchange);
     void addObject       (ManagementObject::shared_ptr object);
     void clientAdded     (void);
     void dispatchCommand (broker::Deliverable&             msg,
@@ -64,6 +66,9 @@
     };
 
     static shared_ptr            agent;
+    static bool                  enabled;
+
+    qpid::sys::RWlock            userLock;
     ManagementObjectMap          managementObjects;
     broker::Timer                timer;
     broker::Exchange::shared_ptr mExchange;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp Thu Dec  6 10:37:18 2007
@@ -27,13 +27,14 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-ManagementExchange::ManagementExchange (const string& _name) :
-    Exchange (_name), TopicExchange(_name) {}
+ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent) :
+    Exchange (_name, _parent), TopicExchange(_name, _parent) {}
 ManagementExchange::ManagementExchange (const std::string& _name,
                                         bool               _durable,
-                                        const FieldTable&  _args) :
-    Exchange     (_name, _durable, _args),
-    TopicExchange(_name, _durable, _args) {}
+                                        const FieldTable&  _args,
+                                        Manageable*        _parent) :
+    Exchange (_name, _durable, _args, _parent), 
+    TopicExchange(_name, _durable, _args, _parent) {}
 
 
 bool ManagementExchange::bind (Queue::shared_ptr queue,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h Thu Dec  6 10:37:18 2007
@@ -35,9 +35,10 @@
   public:
     static const std::string typeName;
 
-    ManagementExchange (const string& name);
+    ManagementExchange (const string& name, Manageable* _parent = 0);
     ManagementExchange (const string& _name, bool _durable, 
-                        const qpid::framing::FieldTable& _args);
+                        const qpid::framing::FieldTable& _args,
+                        Manageable* _parent = 0);
 
     virtual std::string getType() const { return typeName; }            
 



Mime
View raw message