qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cctriel...@apache.org
Subject svn commit: r590806 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/broker/management/
Date Wed, 31 Oct 2007 20:06:07 GMT
Author: cctrieloff
Date: Wed Oct 31 13:06:05 2007
New Revision: 590806

URL: http://svn.apache.org/viewvc?rev=590806&view=rev
Log:

Patch from Ted
QPID-668

This patch does two things:

1) Adds management objects for "broker" and "virtual host".
2) Moves all management-related source files from qpid/broker to qpid/broker/management. 


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=590806&r1=590805&r2=590806&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Oct 31 13:06:05 2007
@@ -159,10 +159,12 @@
   qpid/broker/FanOutExchange.cpp \
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/IncomingExecutionContext.cpp \
-  qpid/broker/ManagementAgent.cpp \
-  qpid/broker/ManagementExchange.cpp \
-  qpid/broker/ManagementObject.cpp \
-  qpid/broker/ManagementObjectQueue.cpp \
+  qpid/broker/management/ManagementAgent.cpp \
+  qpid/broker/management/ManagementExchange.cpp \
+  qpid/broker/management/ManagementObject.cpp \
+  qpid/broker/management/ManagementObjectBroker.cpp \
+  qpid/broker/management/ManagementObjectQueue.cpp \
+  qpid/broker/management/ManagementObjectVhost.cpp \
   qpid/broker/Message.cpp \
   qpid/broker/MessageAdapter.cpp \
   qpid/broker/MessageBuilder.cpp \
@@ -256,10 +258,12 @@
   qpid/broker/HandlerImpl.h \
   qpid/broker/HeadersExchange.h \
   qpid/broker/IncomingExecutionContext.h \
-  qpid/broker/ManagementAgent.h \
-  qpid/broker/ManagementExchange.h \
-  qpid/broker/ManagementObject.h \
-  qpid/broker/ManagementObjectQueue.h \
+  qpid/broker/management/ManagementAgent.h \
+  qpid/broker/management/ManagementExchange.h \
+  qpid/broker/management/ManagementObject.h \
+  qpid/broker/management/ManagementObjectBroker.h \
+  qpid/broker/management/ManagementObjectQueue.h \
+  qpid/broker/management/ManagementObjectVhost.h \
   qpid/broker/Message.h \
   qpid/broker/MessageAdapter.h \
   qpid/broker/MessageBuilder.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=590806&r1=590805&r2=590806&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Oct 31 13:06:05 2007
@@ -28,7 +28,7 @@
 #include "NullMessageStore.h"
 #include "RecoveryManagerImpl.h"
 #include "TopicExchange.h"
-#include "ManagementExchange.h"
+#include "management/ManagementExchange.h"
 
 #include "qpid/log/Statement.h"
 #include "qpid/Url.h"
@@ -125,6 +125,14 @@
         Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
         managementAgent->setExchange (mExchange);
         dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
+
+        mgmtObject = ManagementObjectBroker::shared_ptr (new ManagementObjectBroker (conf));
+        managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtObject));
+
+        // Since there is currently no support for virtual hosts, a management object
+        // representing the implied single virtual host is added here.
+        mgmtVhostObject = ManagementObjectVhost::shared_ptr (new ManagementObjectVhost (conf));
+        managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtVhostObject));
     }
     else
         QPID_LOG(info, "Management not enabled");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=590806&r1=590805&r2=590806&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Oct 31 13:06:05 2007
@@ -30,7 +30,9 @@
 #include "MessageStore.h"
 #include "QueueRegistry.h"
 #include "SessionManager.h"
-#include "ManagementAgent.h"
+#include "management/ManagementAgent.h"
+#include "management/ManagementObjectBroker.h"
+#include "management/ManagementObjectVhost.h"
 #include "qpid/Options.h"
 #include "qpid/Plugin.h"
 #include "qpid/Url.h"
@@ -65,10 +67,10 @@
         int connectionBacklog;
         std::string store;      
         long stagingThreshold;
-	string storeDir;
-	bool storeAsync;
-	bool enableMgmt;
-	uint16_t mgmtPubInterval;
+        string storeDir;
+        bool storeAsync;
+        bool enableMgmt;
+        uint16_t mgmtPubInterval;
         uint32_t ack;
     };
     
@@ -129,6 +131,8 @@
     HandlerUpdaters handlerUpdaters;
     SessionManager sessionManager;
     ManagementAgent::shared_ptr managementAgent;
+    ManagementObjectBroker::shared_ptr mgmtObject;
+    ManagementObjectVhost::shared_ptr  mgmtVhostObject;
 
     static MessageStore* createStore(const Options& config);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=590806&r1=590805&r2=590806&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Wed Oct 31 13:06:05 2007
@@ -23,7 +23,7 @@
 #include "FanOutExchange.h"
 #include "HeadersExchange.h"
 #include "TopicExchange.h"
-#include "ManagementExchange.h"
+#include "management/ManagementExchange.h"
 #include "qpid/framing/reply_exceptions.h"
 
 using namespace qpid::broker;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=590806&r1=590805&r2=590806&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Oct 31 13:06:05 2007
@@ -35,7 +35,7 @@
 #include "PersistableQueue.h"
 #include "QueuePolicy.h"
 #include "QueueBindings.h"
-#include "ManagementObjectQueue.h"
+#include "management/ManagementObjectQueue.h"
 
 namespace qpid {
     namespace broker {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=590806&r1=590805&r2=590806&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Wed Oct 31 13:06:05 2007
@@ -19,8 +19,8 @@
  *
  */
 #include "QueueRegistry.h"
-#include "ManagementAgent.h"
-#include "ManagementObjectQueue.h"
+#include "management/ManagementAgent.h"
+#include "management/ManagementObjectQueue.h"
 #include "qpid/log/Statement.h"
 #include <sstream>
 #include <assert.h>

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=590806&r1=590805&r2=590806&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Wed Oct 31 13:06:05 2007
@@ -24,7 +24,7 @@
 #include <map>
 #include "qpid/sys/Mutex.h"
 #include "Queue.h"
-#include "ManagementAgent.h"
+#include "management/ManagementAgent.h"
 
 namespace qpid {
 namespace broker {

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp Wed Oct 31 13:06:05 2007
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+#include "ManagementAgent.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageDelivery.h>
+#include <qpid/framing/AMQFrame.h>
+#include <list>
+
+using namespace qpid::framing;
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
+{
+    timer.add (TimerTask::shared_ptr (new Periodic(*this, interval)));
+}
+
+void ManagementAgent::setExchange (Exchange::shared_ptr _exchange)
+{
+    exchange = _exchange;
+}
+
+void ManagementAgent::addObject (ManagementObject::shared_ptr object)
+{
+    managementObjects.push_back (object);
+    QPID_LOG(info, "Management Object Added");
+}
+
+ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
+    : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
+
+void ManagementAgent::Periodic::fire ()
+{
+    agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval)));
+    agent.PeriodicProcessing ();
+}
+
+void ManagementAgent::clientAdded (void)
+{
+    for (ManagementObjectVector::iterator iter = managementObjects.begin ();
+         iter != managementObjects.end ();
+         iter++)
+    {
+        ManagementObject::shared_ptr object = *iter;
+        object->setAllChanged   ();
+        object->setSchemaNeeded ();
+    }
+}
+
+void ManagementAgent::PeriodicProcessing (void)
+{
+#define BUFSIZE   65536
+#define THRESHOLD 16384
+    char      msgChars[BUFSIZE];
+    Buffer    msgBuffer (msgChars, BUFSIZE);
+    uint32_t  contentSize;
+    std::list<uint32_t> deleteList;
+
+    if (managementObjects.empty ())
+        return;
+        
+    Message::shared_ptr msg (new Message ());
+
+    // Build the magic number for the management message.
+    msgBuffer.putOctet ('A');
+    msgBuffer.putOctet ('M');
+    msgBuffer.putOctet ('0');
+    msgBuffer.putOctet ('1');
+
+    for (uint32_t idx = 0; idx < managementObjects.size (); idx++)
+    {
+        ManagementObject::shared_ptr object = managementObjects[idx];
+
+        if (object->getSchemaNeeded ())
+        {
+            uint32_t startAvail = msgBuffer.available ();
+            uint32_t recordLength;
+            
+            msgBuffer.putOctet ('S');  // opcode = Schema Record
+            msgBuffer.putOctet (0);    // content-class = N/A
+            msgBuffer.putShort (object->getObjectType ());
+            msgBuffer.record   (); // Record the position of the length field
+            msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+            object->writeSchema (msgBuffer);
+            recordLength = startAvail - msgBuffer.available ();
+            msgBuffer.restore (true);         // Restore pointer to length field
+            msgBuffer.putLong (recordLength);
+            msgBuffer.restore ();             // Re-restore to get to the end of the buffer
+        }
+
+        if (object->getConfigChanged ())
+        {
+            uint32_t startAvail = msgBuffer.available ();
+            uint32_t recordLength;
+            
+            msgBuffer.putOctet ('C');  // opcode = Content Record
+            msgBuffer.putOctet ('C');  // content-class = Configuration
+            msgBuffer.putShort (object->getObjectType ());
+            msgBuffer.record   (); // Record the position of the length field
+            msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+            object->writeConfig (msgBuffer);
+            recordLength = startAvail - msgBuffer.available ();
+            msgBuffer.restore (true);         // Restore pointer to length field
+            msgBuffer.putLong (recordLength);
+            msgBuffer.restore ();             // Re-restore to get to the end of the buffer
+        }
+        
+        if (object->getInstChanged ())
+        {
+            uint32_t startAvail = msgBuffer.available ();
+            uint32_t recordLength;
+            
+            msgBuffer.putOctet ('C');  // opcode = Content Record
+            msgBuffer.putOctet ('I');  // content-class = Instrumentation
+            msgBuffer.putShort (object->getObjectType ());
+            msgBuffer.record   (); // Record the position of the length field
+            msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+            object->writeInstrumentation (msgBuffer);
+            recordLength = startAvail - msgBuffer.available ();
+            msgBuffer.restore (true);         // Restore pointer to length field
+            msgBuffer.putLong (recordLength);
+            msgBuffer.restore ();             // Re-restore to get to the end of the buffer
+        }
+
+        if (object->isDeleted ())
+            deleteList.push_back (idx);
+
+        // Temporary protection against buffer overrun.
+        // This needs to be replaced with frame fragmentation.
+        if (msgBuffer.available () < THRESHOLD)
+            break;
+    }
+    
+    msgBuffer.putOctet ('X');  // End-of-message
+    msgBuffer.putOctet (0);
+    msgBuffer.putShort (0);
+    msgBuffer.putLong  (8);
+
+    contentSize = BUFSIZE - msgBuffer.available ();
+    msgBuffer.reset ();
+
+    AMQFrame method  (0, MessageTransferBody(ProtocolVersion(),
+                                             0, "qpid.management", 0, 0));
+    AMQFrame header  (0, AMQHeaderBody());
+    AMQFrame content;
+
+    content.setBody(AMQContentBody());
+    content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize);
+
+    method.setEof  (false);
+    header.setBof  (false);
+    header.setEof  (false);
+    content.setBof (false);
+
+    msg->getFrames().append(method);
+    msg->getFrames().append(header);
+
+    MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+    props->setContentLength(contentSize);
+    msg->getFrames().append(content);
+
+    DeliverableMessage deliverable (msg);
+    exchange->route (deliverable, "mgmt", 0);
+
+    // Delete flagged objects
+    for (std::list<uint32_t>::reverse_iterator iter = deleteList.rbegin ();
+         iter != deleteList.rend ();
+         iter++)
+    {
+        managementObjects.erase (managementObjects.begin () + *iter);
+    }
+    deleteList.clear ();
+}
+
+void ManagementAgent::dispatchCommand (Deliverable&      /*msg*/,
+                                       const string&     /*routingKey*/,
+                                       const FieldTable* /*args*/)
+{
+}
+

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h Wed Oct 31 13:06:05 2007
@@ -0,0 +1,73 @@
+#ifndef _ManagementAgent_
+#define _ManagementAgent_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Options.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Timer.h"
+#include "ManagementObject.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid { 
+namespace broker {
+
+
+class ManagementAgent
+{
+  public:
+
+    typedef boost::shared_ptr<ManagementAgent> shared_ptr;
+
+    ManagementAgent (uint16_t interval);
+
+    void setExchange     (Exchange::shared_ptr         exchange);
+    void addObject       (ManagementObject::shared_ptr object);
+    void clientAdded     (void);
+    void dispatchCommand (Deliverable&      msg,
+                          const string&     routingKey,
+                          const FieldTable* args);
+    
+  private:
+
+    struct Periodic : public TimerTask
+    {
+        ManagementAgent& agent;
+
+        Periodic (ManagementAgent& agent, uint32_t seconds);
+        ~Periodic () {}
+        void fire ();
+    };
+
+    ManagementObjectVector managementObjects;
+    Timer                  timer;
+    Exchange::shared_ptr   exchange;
+    uint16_t               interval;
+
+    void PeriodicProcessing (void);
+};
+
+}}
+            
+
+
+#endif  /*!_ManagementAgent_*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp Wed Oct 31 13:06:05 2007
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementExchange.h"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+ManagementExchange::ManagementExchange (const string& _name) :
+    Exchange (_name), TopicExchange(_name) {}
+ManagementExchange::ManagementExchange (const std::string& _name,
+                                        bool               _durable,
+                                        const FieldTable&  _args) :
+    Exchange     (_name, _durable, _args),
+    TopicExchange(_name, _durable, _args) {}
+
+
+bool ManagementExchange::bind (Queue::shared_ptr queue,
+                               const string&     routingKey,
+                               const FieldTable* args)
+{
+    bool result = TopicExchange::bind (queue, routingKey, args);
+
+    // Notify the management agent that a new management client has bound to the 
+    // exchange.
+    if (result)
+        managementAgent->clientAdded ();
+
+    return result;
+}
+
+void ManagementExchange::route (Deliverable&      msg,
+                                const string&     routingKey,
+                                const FieldTable* args)
+{
+    // Intercept management commands
+    if (routingKey.length () > 7 &&
+        routingKey.substr (0, 7).compare ("method.") == 0)
+    {
+        managementAgent->dispatchCommand (msg, routingKey, args);
+        return;
+    }
+
+    TopicExchange::route (msg, routingKey, args);
+}
+
+void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent)
+{
+    managementAgent = agent;
+}
+
+
+ManagementExchange::~ManagementExchange() {}
+
+const std::string ManagementExchange::typeName("management");
+

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h Wed Oct 31 13:06:05 2007
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ManagementExchange_
+#define _ManagementExchange_
+
+#include "qpid/broker/TopicExchange.h"
+#include "ManagementAgent.h"
+
+namespace qpid {
+namespace broker {
+
+class ManagementExchange : public virtual TopicExchange
+{
+  private:
+    ManagementAgent::shared_ptr managementAgent;
+ 
+  public:
+    static const std::string typeName;
+
+    ManagementExchange (const string& name);
+    ManagementExchange (const string& _name, bool _durable, 
+                        const qpid::framing::FieldTable& _args);
+
+    virtual std::string getType() const { return typeName; }            
+
+    virtual bool bind (Queue::shared_ptr queue,
+                       const string&     routingKey,
+                       const qpid::framing::FieldTable* args);
+
+    virtual void route (Deliverable& msg,
+                        const string& routingKey,
+                        const qpid::framing::FieldTable* args);
+
+    void setManagmentAgent (ManagementAgent::shared_ptr agent);
+
+    virtual ~ManagementExchange();
+};
+
+
+}
+}
+
+#endif

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp Wed Oct 31 13:06:05 2007
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+#include "ManagementObject.h"
+
+using namespace qpid::framing;
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+void ManagementObject::schemaItem (Buffer&     buf,
+                                   uint8_t     typeCode,
+                                   std::string name,
+                                   std::string description,
+                                   bool        isConfig,
+                                   bool        isIndex)
+{
+    uint8_t flags =
+        (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0);
+
+    buf.putOctet       (flags);
+    buf.putOctet       (typeCode);
+    buf.putShortString (name);
+    buf.putShortString (description);
+}
+
+void ManagementObject::schemaListEnd (Buffer& buf)
+{
+    buf.putOctet (FLAG_END);
+}
+
+void ManagementObject::writeTimestamps (Buffer& buf)
+{
+    buf.putLongLong (uint64_t (Duration (now ())));
+    buf.putLongLong (createTime);
+    buf.putLongLong (destroyTime);
+}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h Wed Oct 31 13:06:05 2007
@@ -0,0 +1,117 @@
+#ifndef _ManagementObject_
+#define _ManagementObject_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+#include <qpid/framing/Buffer.h>
+#include <boost/shared_ptr.hpp>
+#include <vector>
+
+namespace qpid { 
+namespace broker {
+
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+const uint16_t OBJECT_SYSTEM      = 1;
+const uint16_t OBJECT_BROKER      = 2;
+const uint16_t OBJECT_VHOST       = 3;
+const uint16_t OBJECT_QUEUE       = 4;
+const uint16_t OBJECT_EXCHANGE    = 5;
+const uint16_t OBJECT_BINDING     = 6;
+const uint16_t OBJECT_CLIENT      = 7;
+const uint16_t OBJECT_SESSION     = 8;
+const uint16_t OBJECT_DESTINATION = 9;
+const uint16_t OBJECT_PRODUCER    = 10;
+const uint16_t OBJECT_CONSUMER    = 11;
+
+
+class ManagementObject
+{
+  protected:
+    
+    uint64_t createTime;
+    uint64_t destroyTime;
+    bool     configChanged;
+    bool     instChanged;
+    bool     deleted;
+    
+    static const uint8_t TYPE_UINT8  = 1;
+    static const uint8_t TYPE_UINT16 = 2;
+    static const uint8_t TYPE_UINT32 = 3;
+    static const uint8_t TYPE_UINT64 = 4;
+    static const uint8_t TYPE_BOOL   = 5;
+    static const uint8_t TYPE_STRING = 6;
+
+    static const uint8_t FLAG_CONFIG = 0x01;
+    static const uint8_t FLAG_INDEX  = 0x02;
+    static const uint8_t FLAG_END    = 0x80;
+    
+    void schemaItem (Buffer&     buf,
+                     uint8_t     typeCode,
+                     std::string name,
+                     std::string description,
+                     bool        isConfig = false,
+                     bool        isIndex  = false);
+    void schemaListEnd   (Buffer& buf);
+    void writeTimestamps (Buffer& buf);
+
+  public:
+    typedef boost::shared_ptr<ManagementObject> shared_ptr;
+
+    ManagementObject () : destroyTime(0), configChanged(true),
+                          instChanged(true), deleted(false)
+    { createTime = uint64_t (Duration (now ())); }
+    virtual ~ManagementObject () {}
+
+    virtual uint16_t    getObjectType        (void)        = 0;
+    virtual std::string getObjectName        (void)        = 0;
+    virtual void        writeSchema          (Buffer& buf) = 0;
+    virtual void        writeConfig          (Buffer& buf) = 0;
+    virtual void        writeInstrumentation (Buffer& buf) = 0;
+    virtual bool        getSchemaNeeded      (void)        = 0;
+    virtual void        setSchemaNeeded      (void)        = 0;
+
+    inline  bool getConfigChanged (void) { return configChanged; }
+    virtual bool getInstChanged   (void) { return instChanged; }
+    inline  void setAllChanged    (void)
+    {
+        configChanged = true;
+        instChanged   = true;
+    }
+
+    inline void resourceDestroy  (void) {
+        destroyTime = uint64_t (Duration (now ()));
+        deleted     = true;
+    }
+    bool isDeleted (void) { return deleted; }
+
+};
+
+ typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector;
+
+}}
+            
+
+
+#endif  /*!_ManagementObject_*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp Wed Oct 31 13:06:05 2007
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+#include "config.h"
+#include "qpid/broker/Broker.h"
+#include "ManagementObjectBroker.h"
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+bool ManagementObjectBroker::schemaNeeded = true;
+
+ManagementObjectBroker::ManagementObjectBroker (const Options& _conf)
+{
+    Broker::Options& conf = (Broker::Options&) _conf;
+
+    sysId = "sysId";
+    port                 = conf.port;
+    workerThreads        = conf.workerThreads;
+    maxConns             = conf.maxConnections;
+    connBacklog          = conf.connectionBacklog;
+    stagingThreshold     = conf.stagingThreshold;
+    storeLib             = conf.store;
+    asyncStore           = conf.storeAsync;
+    mgmtPubInterval      = conf.mgmtPubInterval;
+    initialDiskPageSize  = 0;
+    initialPagesPerQueue = 0;
+    clusterName          = "";
+    version              = PACKAGE_VERSION;
+}
+
+ManagementObjectBroker::~ManagementObjectBroker () {}
+
+void ManagementObjectBroker::writeSchema (Buffer& buf)
+{
+    schemaNeeded = false;
+
+    schemaItem (buf, TYPE_STRING, "sysId",         "System ID", true, true);
+    schemaItem (buf, TYPE_UINT16, "port",          "TCP Port for AMQP Service", true);
+    schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true);
+    schemaItem (buf, TYPE_UINT16, "maxConns",      "Maximum allowed connections", true);
+    schemaItem (buf, TYPE_UINT16, "connBacklog",
+                "Connection backlog limit for listening socket", true);
+    schemaItem (buf, TYPE_UINT32, "stagingThreshold",
+                "Broker stages messages over this size to disk", true);
+    schemaItem (buf, TYPE_STRING, "storeLib",        "Name of persistent storage library", true);
+    schemaItem (buf, TYPE_UINT8,  "asyncStore",      "Use async persistent store", true);
+    schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true);
+    schemaItem (buf, TYPE_UINT32, "initialDiskPageSize",
+                "Number of disk pages allocated for storage", true);
+    schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue",
+                "Number of disk pages allocated per queue", true);
+    schemaItem (buf, TYPE_STRING, "clusterName",
+                "Name of cluster this server is a member of, zero-length for standalone server", true);
+    schemaItem (buf, TYPE_STRING, "version", "Running software version", true);
+
+    schemaListEnd (buf);
+}
+
+void ManagementObjectBroker::writeConfig (Buffer& buf)
+{
+    configChanged = false;
+
+    writeTimestamps    (buf);
+    buf.putShortString (sysId);
+    buf.putShort       (port);
+    buf.putShort       (workerThreads);
+    buf.putShort       (maxConns);
+    buf.putShort       (connBacklog);
+    buf.putLong        (stagingThreshold);
+    buf.putShortString (storeLib);
+    buf.putOctet       (asyncStore ? 1 : 0);
+    buf.putShort       (mgmtPubInterval);
+    buf.putLong        (initialDiskPageSize);
+    buf.putLong        (initialPagesPerQueue);
+    buf.putShortString (clusterName);
+    buf.putShortString (version);
+}
+

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h Wed Oct 31 13:06:05 2007
@@ -0,0 +1,75 @@
+#ifndef _ManagementObjectBroker_
+#define _ManagementObjectBroker_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+#include "qpid/Options.h"
+#include "boost/shared_ptr.hpp"
+
+namespace qpid { 
+namespace broker {
+
+class ManagementObjectBroker : public ManagementObject
+{
+  public:
+
+    typedef boost::shared_ptr<ManagementObjectBroker> shared_ptr;
+
+    ManagementObjectBroker  (const Options& conf);
+    ~ManagementObjectBroker (void);
+
+  private:
+
+    static bool schemaNeeded;
+
+    std::string objectName;
+
+    std::string sysId;
+    uint16_t    port;
+    uint16_t    workerThreads;
+    uint16_t    maxConns;
+    uint16_t    connBacklog;
+    uint32_t    stagingThreshold;
+    std::string storeLib;
+    bool        asyncStore;
+    uint16_t    mgmtPubInterval;
+    uint32_t    initialDiskPageSize;
+    uint32_t    initialPagesPerQueue;
+    std::string clusterName;
+    std::string version;
+
+    uint16_t    getObjectType        (void) { return OBJECT_BROKER; }
+    std::string getObjectName        (void) { return objectName; }
+    void        writeSchema          (Buffer& buf);
+    void        writeConfig          (Buffer& buf);
+    void        writeInstrumentation (Buffer& /*buf*/) {}
+    bool        getSchemaNeeded      (void) { return schemaNeeded; }
+    void        setSchemaNeeded      (void) { schemaNeeded = true; }
+
+    inline bool getInstChanged       (void) { return false; }
+};
+
+}}
+
+
+#endif  /*!_ManagementObjectBroker_*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp Wed Oct 31 13:06:05 2007
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+#include "ManagementObjectQueue.h"
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+bool ManagementObjectQueue::schemaNeeded = true;
+
+ManagementObjectQueue::ManagementObjectQueue (std::string& _name,
+                                              bool _durable, bool _autoDelete) :
+    vhostName("/"), name(_name), durable(_durable), autoDelete(_autoDelete)
+{
+    msgTotalEnqueues     = 0;
+    msgTotalDequeues     = 0;
+    msgTxEnqueues        = 0;
+    msgTxDequeues        = 0;
+    msgPersistEnqueues   = 0;
+    msgPersistDequeues   = 0;
+
+    msgDepth             = 0;
+    msgDepthLow          = 0;
+    msgDepthHigh         = 0;
+
+    byteTotalEnqueues    = 0;
+    byteTotalDequeues    = 0;
+    byteTxEnqueues       = 0;
+    byteTxDequeues       = 0;
+    bytePersistEnqueues  = 0;
+    bytePersistDequeues  = 0;
+    
+    byteDepth            = 0;
+    byteDepthLow         = 0;
+    byteDepthHigh        = 0;
+
+    enqueueTxStarts      = 0;
+    enqueueTxCommits     = 0;
+    enqueueTxRejects     = 0;
+    dequeueTxStarts      = 0;
+    dequeueTxCommits     = 0;
+    dequeueTxRejects     = 0;
+    
+    enqueueTxCount       = 0;
+    enqueueTxCountLow    = 0;
+    enqueueTxCountHigh   = 0;
+
+    dequeueTxCount       = 0;
+    dequeueTxCountLow    = 0;
+    dequeueTxCountHigh   = 0;
+
+    consumers            = 0;
+    consumersLow         = 0;
+    consumersHigh        = 0;
+}
+
+ManagementObjectQueue::~ManagementObjectQueue () {}
+
+void ManagementObjectQueue::writeSchema (Buffer& buf)
+{
+    schemaNeeded = false;
+
+    schemaItem (buf, TYPE_STRING, "vhostRef",            "Virtual Host Ref", true, true);
+    schemaItem (buf, TYPE_STRING, "name",                "Queue Name", true, true);
+    schemaItem (buf, TYPE_BOOL,   "durable",             "Durable",    true);
+    schemaItem (buf, TYPE_BOOL,   "autoDelete",          "AutoDelete", true);
+
+    schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues",    "Total messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "msgTotalDequeues",    "Total messages dequeued");
+    schemaItem (buf, TYPE_UINT64, "msgTxEnqueues",       "Transactional messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "msgTxDequeues",       "Transactional messages dequeued");
+    schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues",  "Persistent messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "msgPersistDequeues",  "Persistent messages dequeued");
+    schemaItem (buf, TYPE_UINT32, "msgDepth",            "Current size of queue in messages");
+    schemaItem (buf, TYPE_UINT32, "msgDepthLow",         "Low-water queue size, this interval");
+    schemaItem (buf, TYPE_UINT32, "msgDepthHigh",        "High-water queue size, this interval");
+    schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues",   "Total messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "byteTotalDequeues",   "Total messages dequeued");
+    schemaItem (buf, TYPE_UINT64, "byteTxEnqueues",      "Transactional messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "byteTxDequeues",      "Transactional messages dequeued");
+    schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued");
+    schemaItem (buf, TYPE_UINT32, "byteDepth",           "Current size of queue in bytes");
+    schemaItem (buf, TYPE_UINT32, "byteDepthLow",        "Low-water mark this interval");
+    schemaItem (buf, TYPE_UINT32, "byteDepthHigh",       "High-water mark this interval");
+    schemaItem (buf, TYPE_UINT64, "enqueueTxStarts",     "Total enqueue transactions started ");
+    schemaItem (buf, TYPE_UINT64, "enqueueTxCommits",    "Total enqueue transactions committed");
+    schemaItem (buf, TYPE_UINT64, "enqueueTxRejects",    "Total enqueue transactions rejected");
+    schemaItem (buf, TYPE_UINT32, "enqueueTxCount",      "Current pending enqueue transactions");
+    schemaItem (buf, TYPE_UINT32, "enqueueTxCountLow",   "Low water mark this interval");
+    schemaItem (buf, TYPE_UINT32, "enqueueTxCountHigh",  "High water mark this interval");
+    schemaItem (buf, TYPE_UINT64, "dequeueTxStarts",     "Total dequeue transactions started ");
+    schemaItem (buf, TYPE_UINT64, "dequeueTxCommits",    "Total dequeue transactions committed");
+    schemaItem (buf, TYPE_UINT64, "dequeueTxRejects",    "Total dequeue transactions rejected");
+    schemaItem (buf, TYPE_UINT32, "dequeueTxCount",      "Current pending dequeue transactions");
+    schemaItem (buf, TYPE_UINT32, "dequeueTxCountLow",   "Transaction low water mark this interval");
+    schemaItem (buf, TYPE_UINT32, "dequeueTxCountHigh",  "Transaction high water mark this interval");
+    schemaItem (buf, TYPE_UINT32, "consumers",           "Current consumers on queue");
+    schemaItem (buf, TYPE_UINT32, "consumersLow",        "Consumer low water mark this interval");
+    schemaItem (buf, TYPE_UINT32, "consumersHigh",       "Consumer high water mark this interval");
+
+    schemaListEnd (buf);
+}
+
+void ManagementObjectQueue::writeConfig (Buffer& buf)
+{
+    configChanged = false;
+
+    writeTimestamps    (buf);
+    buf.putShortString (vhostName);
+    buf.putShortString (name);
+    buf.putOctet       (durable    ? 1 : 0);
+    buf.putOctet       (autoDelete ? 1 : 0);
+}
+
+void ManagementObjectQueue::writeInstrumentation (Buffer& buf)
+{
+    instChanged = false;
+
+    writeTimestamps (buf);
+    buf.putShortString (vhostName);
+    buf.putShortString (name);
+    buf.putLongLong (msgTotalEnqueues);
+    buf.putLongLong (msgTotalDequeues);
+    buf.putLongLong (msgTxEnqueues);
+    buf.putLongLong (msgTxDequeues);
+    buf.putLongLong (msgPersistEnqueues);
+    buf.putLongLong (msgPersistDequeues);
+    buf.putLong     (msgDepth);
+    buf.putLong     (msgDepthLow);
+    buf.putLong     (msgDepthHigh);
+    buf.putLongLong (byteTotalEnqueues);
+    buf.putLongLong (byteTotalDequeues);
+    buf.putLongLong (byteTxEnqueues);
+    buf.putLongLong (byteTxDequeues);
+    buf.putLongLong (bytePersistEnqueues);
+    buf.putLongLong (bytePersistDequeues);
+    buf.putLong     (byteDepth);
+    buf.putLong     (byteDepthLow);
+    buf.putLong     (byteDepthHigh);
+    buf.putLongLong (enqueueTxStarts);
+    buf.putLongLong (enqueueTxCommits);
+    buf.putLongLong (enqueueTxRejects);
+    buf.putLong     (enqueueTxCount);
+    buf.putLong     (enqueueTxCountLow);
+    buf.putLong     (enqueueTxCountHigh);
+    buf.putLongLong (dequeueTxStarts);
+    buf.putLongLong (dequeueTxCommits);
+    buf.putLongLong (dequeueTxRejects);
+    buf.putLong     (dequeueTxCount);
+    buf.putLong     (dequeueTxCountLow);
+    buf.putLong     (dequeueTxCountHigh);
+    buf.putLong     (consumers);
+    buf.putLong     (consumersLow);
+    buf.putLong     (consumersHigh);
+
+    msgDepthLow        = msgDepth;
+    msgDepthHigh       = msgDepth;
+    byteDepthLow       = byteDepth;
+    byteDepthHigh      = byteDepth;
+    enqueueTxCountLow  = enqueueTxCount;
+    enqueueTxCountHigh = enqueueTxCount;
+    dequeueTxCountLow  = dequeueTxCount;
+    dequeueTxCountHigh = dequeueTxCount;
+    consumersLow       = consumers;
+    consumersHigh      = consumers;
+}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h Wed Oct 31 13:06:05 2007
@@ -0,0 +1,181 @@
+#ifndef _ManagementObjectQueue_
+#define _ManagementObjectQueue_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+
+namespace qpid { 
+namespace broker {
+
+const uint32_t MSG_MASK_TX      = 1;  // Transactional message
+const uint32_t MSG_MASK_PERSIST = 2;  // Persistent message
+
+class ManagementObjectQueue : public ManagementObject
+{
+  private:
+
+    static bool schemaNeeded;
+
+    std::string objectName;
+    std::string vhostName;
+    std::string name;
+    bool        durable;
+    bool        autoDelete;
+  
+    uint64_t  msgTotalEnqueues;     // Total messages enqueued
+    uint64_t  msgTotalDequeues;     // Total messages dequeued
+    uint64_t  msgTxEnqueues;        // Transactional messages enqueued
+    uint64_t  msgTxDequeues;        // Transactional messages dequeued
+    uint64_t  msgPersistEnqueues;   // Persistent messages enqueued
+    uint64_t  msgPersistDequeues;   // Persistent messages dequeued
+    
+    uint32_t  msgDepth;             // Current size of queue in messages
+    uint32_t  msgDepthLow;          // Low-water queue size, this interval
+    uint32_t  msgDepthHigh;         // High-water queue size, this interval
+
+    uint64_t  byteTotalEnqueues;    // Total messages enqueued
+    uint64_t  byteTotalDequeues;    // Total messages dequeued
+    uint64_t  byteTxEnqueues;       // Transactional messages enqueued
+    uint64_t  byteTxDequeues;       // Transactional messages dequeued
+    uint64_t  bytePersistEnqueues;  // Persistent messages enqueued
+    uint64_t  bytePersistDequeues;  // Persistent messages dequeued
+
+    uint32_t  byteDepth;            // Current size of queue in bytes
+    uint32_t  byteDepthLow;         // Low-water mark this interval
+    uint32_t  byteDepthHigh;        // High-water mark this interval
+    
+    uint64_t  enqueueTxStarts;      // Total enqueue transactions started 
+    uint64_t  enqueueTxCommits;     // Total enqueue transactions committed
+    uint64_t  enqueueTxRejects;     // Total enqueue transactions rejected
+    
+    uint32_t  enqueueTxCount;       // Current pending enqueue transactions
+    uint32_t  enqueueTxCountLow;    // Low water mark this interval
+    uint32_t  enqueueTxCountHigh;   // High water mark this interval
+    
+    uint64_t  dequeueTxStarts;      // Total dequeue transactions started 
+    uint64_t  dequeueTxCommits;     // Total dequeue transactions committed
+    uint64_t  dequeueTxRejects;     // Total dequeue transactions rejected
+    
+    uint32_t  dequeueTxCount;       // Current pending dequeue transactions
+    uint32_t  dequeueTxCountLow;    // Low water mark this interval
+    uint32_t  dequeueTxCountHigh;   // High water mark this interval
+    
+    uint32_t  consumers;            // Current consumers on queue
+    uint32_t  consumersLow;         // Low water mark this interval
+    uint32_t  consumersHigh;        // High water mark this interval
+
+    uint16_t    getObjectType        (void) { return OBJECT_QUEUE; }
+    std::string getObjectName        (void) { return objectName; }
+    void        writeSchema          (Buffer& buf);
+    void        writeConfig          (Buffer& buf);
+    void        writeInstrumentation (Buffer& buf);
+    bool        getSchemaNeeded      (void) { return schemaNeeded; }
+    void        setSchemaNeeded      (void) { schemaNeeded = true; }
+    
+    inline void adjustQueueHiLo (void){
+        if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
+        if (msgDepth < msgDepthLow)  msgDepthLow  = msgDepth;
+
+        if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth;
+        if (byteDepth < byteDepthLow)  byteDepthLow  = byteDepth;
+        instChanged = true;
+    }
+    
+    inline void adjustTxHiLo (void){
+        if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount;
+        if (enqueueTxCount < enqueueTxCountLow)  enqueueTxCountLow  = enqueueTxCount;
+        if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount;
+        if (dequeueTxCount < dequeueTxCountLow)  dequeueTxCountLow  = dequeueTxCount;
+        instChanged = true;
+    }
+    
+    inline void adjustConsumerHiLo (void){
+        if (consumers > consumersHigh) consumersHigh = consumers;
+        if (consumers < consumersLow)  consumersLow  = consumers;
+        instChanged = true;
+    }
+
+  public:
+
+    typedef boost::shared_ptr<ManagementObjectQueue> shared_ptr;
+
+    ManagementObjectQueue  (std::string& name, bool durable, bool autoDelete);
+    ~ManagementObjectQueue (void);
+
+    // The following mask contents are used to describe enqueued or dequeued
+    // messages when counting statistics.
+
+    inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){
+        msgTotalEnqueues++;
+        byteTotalEnqueues += bytes;
+        
+        if (attrMask & MSG_MASK_TX){
+            msgTxEnqueues++;
+            byteTxEnqueues += bytes;
+        }
+        
+        if (attrMask & MSG_MASK_PERSIST){
+            msgPersistEnqueues++;
+            bytePersistEnqueues += bytes;
+        }
+
+        msgDepth++;
+        byteDepth += bytes;
+        adjustQueueHiLo ();
+    }
+    
+    inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){
+        msgTotalDequeues++;
+        byteTotalDequeues += bytes;
+
+        if (attrMask & MSG_MASK_TX){
+            msgTxDequeues++;
+            byteTxDequeues += bytes;
+        }
+        
+        if (attrMask & MSG_MASK_PERSIST){
+            msgPersistDequeues++;
+            bytePersistDequeues += bytes;
+        }
+
+        msgDepth--;
+        byteDepth -= bytes;
+        adjustQueueHiLo ();
+    }
+    
+    inline void incConsumers (void){
+        consumers++;
+        adjustConsumerHiLo ();
+    }
+    
+    inline void decConsumers (void){
+        consumers--;
+        adjustConsumerHiLo ();
+    }
+};
+
+}}
+            
+
+
+#endif  /*!_ManagementObjectQueue_*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp Wed Oct 31 13:06:05 2007
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+#include "qpid/broker/Broker.h"
+#include "ManagementObjectVhost.h"
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+bool ManagementObjectVhost::schemaNeeded = true;
+
+ManagementObjectVhost::ManagementObjectVhost (const Options& /*_conf*/)
+{
+    name = "/";
+}
+
+ManagementObjectVhost::~ManagementObjectVhost () {}
+
+void ManagementObjectVhost::writeSchema (Buffer& buf)
+{
+    schemaNeeded = false;
+
+    schemaItem (buf, TYPE_STRING, "name", "Name of virtual host",      true, true);
+
+    schemaListEnd (buf);
+}
+
+void ManagementObjectVhost::writeConfig (Buffer& buf)
+{
+    configChanged = false;
+
+    writeTimestamps    (buf);
+    buf.putShortString (name);
+}
+

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h?rev=590806&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h Wed Oct 31 13:06:05 2007
@@ -0,0 +1,63 @@
+#ifndef _ManagementObjectVhost_
+#define _ManagementObjectVhost_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+#include "qpid/Options.h"
+#include "boost/shared_ptr.hpp"
+
+namespace qpid { 
+namespace broker {
+
+class ManagementObjectVhost : public ManagementObject
+{
+  public:
+
+    typedef boost::shared_ptr<ManagementObjectVhost> shared_ptr;
+
+    ManagementObjectVhost  (const Options& conf);
+    ~ManagementObjectVhost (void);
+
+  private:
+
+    static bool schemaNeeded;
+
+    std::string objectName;
+
+    std::string name;
+
+    uint16_t    getObjectType        (void) { return OBJECT_VHOST; }
+    std::string getObjectName        (void) { return objectName; }
+    void        writeSchema          (Buffer& buf);
+    void        writeConfig          (Buffer& buf);
+    void        writeInstrumentation (Buffer& /*buf*/) {}
+    bool        getSchemaNeeded      (void) { return schemaNeeded; }
+    void        setSchemaNeeded      (void) { schemaNeeded = true; }
+
+    inline bool getInstChanged       (void) { return false; }
+};
+
+}}
+
+
+#endif  /*!_ManagementObjectVhost_*/



Mime
View raw message