From qpid-commits-return-3942-apmail-incubator-qpid-commits-archive=incubator.apache.org@incubator.apache.org Wed Oct 31 20:06:50 2007 Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 38682 invoked from network); 31 Oct 2007 20:06:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 31 Oct 2007 20:06:50 -0000 Received: (qmail 67174 invoked by uid 500); 31 Oct 2007 20:06:38 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 67160 invoked by uid 500); 31 Oct 2007 20:06:38 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 67151 invoked by uid 99); 31 Oct 2007 20:06:37 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Oct 2007 13:06:37 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Oct 2007 20:06:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 237601A9838; Wed, 31 Oct 2007 13:06:11 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r590806 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/broker/management/ Date: Wed, 31 Oct 2007 20:06:07 -0000 To: qpid-commits@incubator.apache.org From: cctrieloff@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071031200612.237601A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cctrieloff Date: Wed Oct 31 13:06:05 2007 New Revision: 590806 URL: http://svn.apache.org/viewvc?rev=590806&view=rev Log: Patch from Ted QPID-668 This patch does two things: 1) Adds management objects for "broker" and "virtual host". 2) Moves all management-related source files from qpid/broker to qpid/broker/management. Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h Removed: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=590806&r1=590805&r2=590806&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Oct 31 13:06:05 2007 @@ -159,10 +159,12 @@ qpid/broker/FanOutExchange.cpp \ qpid/broker/HeadersExchange.cpp \ qpid/broker/IncomingExecutionContext.cpp \ - qpid/broker/ManagementAgent.cpp \ - qpid/broker/ManagementExchange.cpp \ - qpid/broker/ManagementObject.cpp \ - qpid/broker/ManagementObjectQueue.cpp \ + qpid/broker/management/ManagementAgent.cpp \ + qpid/broker/management/ManagementExchange.cpp \ + qpid/broker/management/ManagementObject.cpp \ + qpid/broker/management/ManagementObjectBroker.cpp \ + qpid/broker/management/ManagementObjectQueue.cpp \ + qpid/broker/management/ManagementObjectVhost.cpp \ qpid/broker/Message.cpp \ qpid/broker/MessageAdapter.cpp \ qpid/broker/MessageBuilder.cpp \ @@ -256,10 +258,12 @@ qpid/broker/HandlerImpl.h \ qpid/broker/HeadersExchange.h \ qpid/broker/IncomingExecutionContext.h \ - qpid/broker/ManagementAgent.h \ - qpid/broker/ManagementExchange.h \ - qpid/broker/ManagementObject.h \ - qpid/broker/ManagementObjectQueue.h \ + qpid/broker/management/ManagementAgent.h \ + qpid/broker/management/ManagementExchange.h \ + qpid/broker/management/ManagementObject.h \ + qpid/broker/management/ManagementObjectBroker.h \ + qpid/broker/management/ManagementObjectQueue.h \ + qpid/broker/management/ManagementObjectVhost.h \ qpid/broker/Message.h \ qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.h \ Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=590806&r1=590805&r2=590806&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Oct 31 13:06:05 2007 @@ -28,7 +28,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" -#include "ManagementExchange.h" +#include "management/ManagementExchange.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" @@ -125,6 +125,14 @@ Exchange::shared_ptr mExchange = exchanges.get (qpid_management); managementAgent->setExchange (mExchange); dynamic_pointer_cast(mExchange)->setManagmentAgent (managementAgent); + + mgmtObject = ManagementObjectBroker::shared_ptr (new ManagementObjectBroker (conf)); + managementAgent->addObject (dynamic_pointer_cast(mgmtObject)); + + // Since there is currently no support for virtual hosts, a management object + // representing the implied single virtual host is added here. + mgmtVhostObject = ManagementObjectVhost::shared_ptr (new ManagementObjectVhost (conf)); + managementAgent->addObject (dynamic_pointer_cast(mgmtVhostObject)); } else QPID_LOG(info, "Management not enabled"); Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=590806&r1=590805&r2=590806&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Oct 31 13:06:05 2007 @@ -30,7 +30,9 @@ #include "MessageStore.h" #include "QueueRegistry.h" #include "SessionManager.h" -#include "ManagementAgent.h" +#include "management/ManagementAgent.h" +#include "management/ManagementObjectBroker.h" +#include "management/ManagementObjectVhost.h" #include "qpid/Options.h" #include "qpid/Plugin.h" #include "qpid/Url.h" @@ -65,10 +67,10 @@ int connectionBacklog; std::string store; long stagingThreshold; - string storeDir; - bool storeAsync; - bool enableMgmt; - uint16_t mgmtPubInterval; + string storeDir; + bool storeAsync; + bool enableMgmt; + uint16_t mgmtPubInterval; uint32_t ack; }; @@ -129,6 +131,8 @@ HandlerUpdaters handlerUpdaters; SessionManager sessionManager; ManagementAgent::shared_ptr managementAgent; + ManagementObjectBroker::shared_ptr mgmtObject; + ManagementObjectVhost::shared_ptr mgmtVhostObject; static MessageStore* createStore(const Options& config); }; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=590806&r1=590805&r2=590806&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Wed Oct 31 13:06:05 2007 @@ -23,7 +23,7 @@ #include "FanOutExchange.h" #include "HeadersExchange.h" #include "TopicExchange.h" -#include "ManagementExchange.h" +#include "management/ManagementExchange.h" #include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=590806&r1=590805&r2=590806&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Oct 31 13:06:05 2007 @@ -35,7 +35,7 @@ #include "PersistableQueue.h" #include "QueuePolicy.h" #include "QueueBindings.h" -#include "ManagementObjectQueue.h" +#include "management/ManagementObjectQueue.h" namespace qpid { namespace broker { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=590806&r1=590805&r2=590806&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Wed Oct 31 13:06:05 2007 @@ -19,8 +19,8 @@ * */ #include "QueueRegistry.h" -#include "ManagementAgent.h" -#include "ManagementObjectQueue.h" +#include "management/ManagementAgent.h" +#include "management/ManagementObjectQueue.h" #include "qpid/log/Statement.h" #include #include Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=590806&r1=590805&r2=590806&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Wed Oct 31 13:06:05 2007 @@ -24,7 +24,7 @@ #include #include "qpid/sys/Mutex.h" #include "Queue.h" -#include "ManagementAgent.h" +#include "management/ManagementAgent.h" namespace qpid { namespace broker { Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp Wed Oct 31 13:06:05 2007 @@ -0,0 +1,204 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementAgent.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include +#include +#include +#include + +using namespace qpid::framing; +using namespace qpid::broker; +using namespace qpid::sys; + +ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) +{ + timer.add (TimerTask::shared_ptr (new Periodic(*this, interval))); +} + +void ManagementAgent::setExchange (Exchange::shared_ptr _exchange) +{ + exchange = _exchange; +} + +void ManagementAgent::addObject (ManagementObject::shared_ptr object) +{ + managementObjects.push_back (object); + QPID_LOG(info, "Management Object Added"); +} + +ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) + : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} + +void ManagementAgent::Periodic::fire () +{ + agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval))); + agent.PeriodicProcessing (); +} + +void ManagementAgent::clientAdded (void) +{ + for (ManagementObjectVector::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = *iter; + object->setAllChanged (); + object->setSchemaNeeded (); + } +} + +void ManagementAgent::PeriodicProcessing (void) +{ +#define BUFSIZE 65536 +#define THRESHOLD 16384 + char msgChars[BUFSIZE]; + Buffer msgBuffer (msgChars, BUFSIZE); + uint32_t contentSize; + std::list deleteList; + + if (managementObjects.empty ()) + return; + + Message::shared_ptr msg (new Message ()); + + // Build the magic number for the management message. + msgBuffer.putOctet ('A'); + msgBuffer.putOctet ('M'); + msgBuffer.putOctet ('0'); + msgBuffer.putOctet ('1'); + + for (uint32_t idx = 0; idx < managementObjects.size (); idx++) + { + ManagementObject::shared_ptr object = managementObjects[idx]; + + if (object->getSchemaNeeded ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('S'); // opcode = Schema Record + msgBuffer.putOctet (0); // content-class = N/A + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeSchema (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->getConfigChanged ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('C'); // content-class = Configuration + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeConfig (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->getInstChanged ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('I'); // content-class = Instrumentation + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeInstrumentation (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->isDeleted ()) + deleteList.push_back (idx); + + // Temporary protection against buffer overrun. + // This needs to be replaced with frame fragmentation. + if (msgBuffer.available () < THRESHOLD) + break; + } + + msgBuffer.putOctet ('X'); // End-of-message + msgBuffer.putOctet (0); + msgBuffer.putShort (0); + msgBuffer.putLong (8); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + + AMQFrame method (0, MessageTransferBody(ProtocolVersion(), + 0, "qpid.management", 0, 0)); + AMQFrame header (0, AMQHeaderBody()); + AMQFrame content; + + content.setBody(AMQContentBody()); + content.castBody()->decode(msgBuffer, contentSize); + + method.setEof (false); + header.setBof (false); + header.setEof (false); + content.setBof (false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = msg->getFrames().getHeaders()->get(true); + props->setContentLength(contentSize); + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + exchange->route (deliverable, "mgmt", 0); + + // Delete flagged objects + for (std::list::reverse_iterator iter = deleteList.rbegin (); + iter != deleteList.rend (); + iter++) + { + managementObjects.erase (managementObjects.begin () + *iter); + } + deleteList.clear (); +} + +void ManagementAgent::dispatchCommand (Deliverable& /*msg*/, + const string& /*routingKey*/, + const FieldTable* /*args*/) +{ +} + Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h Wed Oct 31 13:06:05 2007 @@ -0,0 +1,73 @@ +#ifndef _ManagementAgent_ +#define _ManagementAgent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Options.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Timer.h" +#include "ManagementObject.h" +#include + +namespace qpid { +namespace broker { + + +class ManagementAgent +{ + public: + + typedef boost::shared_ptr shared_ptr; + + ManagementAgent (uint16_t interval); + + void setExchange (Exchange::shared_ptr exchange); + void addObject (ManagementObject::shared_ptr object); + void clientAdded (void); + void dispatchCommand (Deliverable& msg, + const string& routingKey, + const FieldTable* args); + + private: + + struct Periodic : public TimerTask + { + ManagementAgent& agent; + + Periodic (ManagementAgent& agent, uint32_t seconds); + ~Periodic () {} + void fire (); + }; + + ManagementObjectVector managementObjects; + Timer timer; + Exchange::shared_ptr exchange; + uint16_t interval; + + void PeriodicProcessing (void); +}; + +}} + + + +#endif /*!_ManagementAgent_*/ Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp Wed Oct 31 13:06:05 2007 @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementExchange.h" +#include "qpid/log/Statement.h" + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +ManagementExchange::ManagementExchange (const string& _name) : + Exchange (_name), TopicExchange(_name) {} +ManagementExchange::ManagementExchange (const std::string& _name, + bool _durable, + const FieldTable& _args) : + Exchange (_name, _durable, _args), + TopicExchange(_name, _durable, _args) {} + + +bool ManagementExchange::bind (Queue::shared_ptr queue, + const string& routingKey, + const FieldTable* args) +{ + bool result = TopicExchange::bind (queue, routingKey, args); + + // Notify the management agent that a new management client has bound to the + // exchange. + if (result) + managementAgent->clientAdded (); + + return result; +} + +void ManagementExchange::route (Deliverable& msg, + const string& routingKey, + const FieldTable* args) +{ + // Intercept management commands + if (routingKey.length () > 7 && + routingKey.substr (0, 7).compare ("method.") == 0) + { + managementAgent->dispatchCommand (msg, routingKey, args); + return; + } + + TopicExchange::route (msg, routingKey, args); +} + +void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent) +{ + managementAgent = agent; +} + + +ManagementExchange::~ManagementExchange() {} + +const std::string ManagementExchange::typeName("management"); + Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h Wed Oct 31 13:06:05 2007 @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ManagementExchange_ +#define _ManagementExchange_ + +#include "qpid/broker/TopicExchange.h" +#include "ManagementAgent.h" + +namespace qpid { +namespace broker { + +class ManagementExchange : public virtual TopicExchange +{ + private: + ManagementAgent::shared_ptr managementAgent; + + public: + static const std::string typeName; + + ManagementExchange (const string& name); + ManagementExchange (const string& _name, bool _durable, + const qpid::framing::FieldTable& _args); + + virtual std::string getType() const { return typeName; } + + virtual bool bind (Queue::shared_ptr queue, + const string& routingKey, + const qpid::framing::FieldTable* args); + + virtual void route (Deliverable& msg, + const string& routingKey, + const qpid::framing::FieldTable* args); + + void setManagmentAgent (ManagementAgent::shared_ptr agent); + + virtual ~ManagementExchange(); +}; + + +} +} + +#endif Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp Wed Oct 31 13:06:05 2007 @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" + +using namespace qpid::framing; +using namespace qpid::broker; +using namespace qpid::sys; + +void ManagementObject::schemaItem (Buffer& buf, + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig, + bool isIndex) +{ + uint8_t flags = + (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0); + + buf.putOctet (flags); + buf.putOctet (typeCode); + buf.putShortString (name); + buf.putShortString (description); +} + +void ManagementObject::schemaListEnd (Buffer& buf) +{ + buf.putOctet (FLAG_END); +} + +void ManagementObject::writeTimestamps (Buffer& buf) +{ + buf.putLongLong (uint64_t (Duration (now ()))); + buf.putLongLong (createTime); + buf.putLongLong (destroyTime); +} Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h Wed Oct 31 13:06:05 2007 @@ -0,0 +1,117 @@ +#ifndef _ManagementObject_ +#define _ManagementObject_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" +#include +#include +#include + +namespace qpid { +namespace broker { + +using namespace qpid::framing; +using namespace qpid::sys; + +const uint16_t OBJECT_SYSTEM = 1; +const uint16_t OBJECT_BROKER = 2; +const uint16_t OBJECT_VHOST = 3; +const uint16_t OBJECT_QUEUE = 4; +const uint16_t OBJECT_EXCHANGE = 5; +const uint16_t OBJECT_BINDING = 6; +const uint16_t OBJECT_CLIENT = 7; +const uint16_t OBJECT_SESSION = 8; +const uint16_t OBJECT_DESTINATION = 9; +const uint16_t OBJECT_PRODUCER = 10; +const uint16_t OBJECT_CONSUMER = 11; + + +class ManagementObject +{ + protected: + + uint64_t createTime; + uint64_t destroyTime; + bool configChanged; + bool instChanged; + bool deleted; + + static const uint8_t TYPE_UINT8 = 1; + static const uint8_t TYPE_UINT16 = 2; + static const uint8_t TYPE_UINT32 = 3; + static const uint8_t TYPE_UINT64 = 4; + static const uint8_t TYPE_BOOL = 5; + static const uint8_t TYPE_STRING = 6; + + static const uint8_t FLAG_CONFIG = 0x01; + static const uint8_t FLAG_INDEX = 0x02; + static const uint8_t FLAG_END = 0x80; + + void schemaItem (Buffer& buf, + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig = false, + bool isIndex = false); + void schemaListEnd (Buffer& buf); + void writeTimestamps (Buffer& buf); + + public: + typedef boost::shared_ptr shared_ptr; + + ManagementObject () : destroyTime(0), configChanged(true), + instChanged(true), deleted(false) + { createTime = uint64_t (Duration (now ())); } + virtual ~ManagementObject () {} + + virtual uint16_t getObjectType (void) = 0; + virtual std::string getObjectName (void) = 0; + virtual void writeSchema (Buffer& buf) = 0; + virtual void writeConfig (Buffer& buf) = 0; + virtual void writeInstrumentation (Buffer& buf) = 0; + virtual bool getSchemaNeeded (void) = 0; + virtual void setSchemaNeeded (void) = 0; + + inline bool getConfigChanged (void) { return configChanged; } + virtual bool getInstChanged (void) { return instChanged; } + inline void setAllChanged (void) + { + configChanged = true; + instChanged = true; + } + + inline void resourceDestroy (void) { + destroyTime = uint64_t (Duration (now ())); + deleted = true; + } + bool isDeleted (void) { return deleted; } + +}; + + typedef std::vector ManagementObjectVector; + +}} + + + +#endif /*!_ManagementObject_*/ Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp Wed Oct 31 13:06:05 2007 @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "config.h" +#include "qpid/broker/Broker.h" +#include "ManagementObjectBroker.h" + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; + +bool ManagementObjectBroker::schemaNeeded = true; + +ManagementObjectBroker::ManagementObjectBroker (const Options& _conf) +{ + Broker::Options& conf = (Broker::Options&) _conf; + + sysId = "sysId"; + port = conf.port; + workerThreads = conf.workerThreads; + maxConns = conf.maxConnections; + connBacklog = conf.connectionBacklog; + stagingThreshold = conf.stagingThreshold; + storeLib = conf.store; + asyncStore = conf.storeAsync; + mgmtPubInterval = conf.mgmtPubInterval; + initialDiskPageSize = 0; + initialPagesPerQueue = 0; + clusterName = ""; + version = PACKAGE_VERSION; +} + +ManagementObjectBroker::~ManagementObjectBroker () {} + +void ManagementObjectBroker::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaItem (buf, TYPE_STRING, "sysId", "System ID", true, true); + schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true); + schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true); + schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true); + schemaItem (buf, TYPE_UINT16, "connBacklog", + "Connection backlog limit for listening socket", true); + schemaItem (buf, TYPE_UINT32, "stagingThreshold", + "Broker stages messages over this size to disk", true); + schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true); + schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true); + schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true); + schemaItem (buf, TYPE_UINT32, "initialDiskPageSize", + "Number of disk pages allocated for storage", true); + schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue", + "Number of disk pages allocated per queue", true); + schemaItem (buf, TYPE_STRING, "clusterName", + "Name of cluster this server is a member of, zero-length for standalone server", true); + schemaItem (buf, TYPE_STRING, "version", "Running software version", true); + + schemaListEnd (buf); +} + +void ManagementObjectBroker::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putShortString (sysId); + buf.putShort (port); + buf.putShort (workerThreads); + buf.putShort (maxConns); + buf.putShort (connBacklog); + buf.putLong (stagingThreshold); + buf.putShortString (storeLib); + buf.putOctet (asyncStore ? 1 : 0); + buf.putShort (mgmtPubInterval); + buf.putLong (initialDiskPageSize); + buf.putLong (initialPagesPerQueue); + buf.putShortString (clusterName); + buf.putShortString (version); +} + Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h Wed Oct 31 13:06:05 2007 @@ -0,0 +1,75 @@ +#ifndef _ManagementObjectBroker_ +#define _ManagementObjectBroker_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" +#include "qpid/Options.h" +#include "boost/shared_ptr.hpp" + +namespace qpid { +namespace broker { + +class ManagementObjectBroker : public ManagementObject +{ + public: + + typedef boost::shared_ptr shared_ptr; + + ManagementObjectBroker (const Options& conf); + ~ManagementObjectBroker (void); + + private: + + static bool schemaNeeded; + + std::string objectName; + + std::string sysId; + uint16_t port; + uint16_t workerThreads; + uint16_t maxConns; + uint16_t connBacklog; + uint32_t stagingThreshold; + std::string storeLib; + bool asyncStore; + uint16_t mgmtPubInterval; + uint32_t initialDiskPageSize; + uint32_t initialPagesPerQueue; + std::string clusterName; + std::string version; + + uint16_t getObjectType (void) { return OBJECT_BROKER; } + std::string getObjectName (void) { return objectName; } + void writeSchema (Buffer& buf); + void writeConfig (Buffer& buf); + void writeInstrumentation (Buffer& /*buf*/) {} + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + + inline bool getInstChanged (void) { return false; } +}; + +}} + + +#endif /*!_ManagementObjectBroker_*/ Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp Wed Oct 31 13:06:05 2007 @@ -0,0 +1,186 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObjectQueue.h" + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; + +bool ManagementObjectQueue::schemaNeeded = true; + +ManagementObjectQueue::ManagementObjectQueue (std::string& _name, + bool _durable, bool _autoDelete) : + vhostName("/"), name(_name), durable(_durable), autoDelete(_autoDelete) +{ + msgTotalEnqueues = 0; + msgTotalDequeues = 0; + msgTxEnqueues = 0; + msgTxDequeues = 0; + msgPersistEnqueues = 0; + msgPersistDequeues = 0; + + msgDepth = 0; + msgDepthLow = 0; + msgDepthHigh = 0; + + byteTotalEnqueues = 0; + byteTotalDequeues = 0; + byteTxEnqueues = 0; + byteTxDequeues = 0; + bytePersistEnqueues = 0; + bytePersistDequeues = 0; + + byteDepth = 0; + byteDepthLow = 0; + byteDepthHigh = 0; + + enqueueTxStarts = 0; + enqueueTxCommits = 0; + enqueueTxRejects = 0; + dequeueTxStarts = 0; + dequeueTxCommits = 0; + dequeueTxRejects = 0; + + enqueueTxCount = 0; + enqueueTxCountLow = 0; + enqueueTxCountHigh = 0; + + dequeueTxCount = 0; + dequeueTxCountLow = 0; + dequeueTxCountHigh = 0; + + consumers = 0; + consumersLow = 0; + consumersHigh = 0; +} + +ManagementObjectQueue::~ManagementObjectQueue () {} + +void ManagementObjectQueue::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaItem (buf, TYPE_STRING, "vhostRef", "Virtual Host Ref", true, true); + schemaItem (buf, TYPE_STRING, "name", "Queue Name", true, true); + schemaItem (buf, TYPE_BOOL, "durable", "Durable", true); + schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true); + + schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgTxEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTxDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages"); + schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval"); + schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval"); + schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "byteTxEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTxDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes"); + schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval"); + schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval"); + schemaItem (buf, TYPE_UINT64, "enqueueTxStarts", "Total enqueue transactions started "); + schemaItem (buf, TYPE_UINT64, "enqueueTxCommits", "Total enqueue transactions committed"); + schemaItem (buf, TYPE_UINT64, "enqueueTxRejects", "Total enqueue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCount", "Current pending enqueue transactions"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCountLow", "Low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCountHigh", "High water mark this interval"); + schemaItem (buf, TYPE_UINT64, "dequeueTxStarts", "Total dequeue transactions started "); + schemaItem (buf, TYPE_UINT64, "dequeueTxCommits", "Total dequeue transactions committed"); + schemaItem (buf, TYPE_UINT64, "dequeueTxRejects", "Total dequeue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCount", "Current pending dequeue transactions"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCountLow", "Transaction low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCountHigh", "Transaction high water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue"); + schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval"); + + schemaListEnd (buf); +} + +void ManagementObjectQueue::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putShortString (vhostName); + buf.putShortString (name); + buf.putOctet (durable ? 1 : 0); + buf.putOctet (autoDelete ? 1 : 0); +} + +void ManagementObjectQueue::writeInstrumentation (Buffer& buf) +{ + instChanged = false; + + writeTimestamps (buf); + buf.putShortString (vhostName); + buf.putShortString (name); + buf.putLongLong (msgTotalEnqueues); + buf.putLongLong (msgTotalDequeues); + buf.putLongLong (msgTxEnqueues); + buf.putLongLong (msgTxDequeues); + buf.putLongLong (msgPersistEnqueues); + buf.putLongLong (msgPersistDequeues); + buf.putLong (msgDepth); + buf.putLong (msgDepthLow); + buf.putLong (msgDepthHigh); + buf.putLongLong (byteTotalEnqueues); + buf.putLongLong (byteTotalDequeues); + buf.putLongLong (byteTxEnqueues); + buf.putLongLong (byteTxDequeues); + buf.putLongLong (bytePersistEnqueues); + buf.putLongLong (bytePersistDequeues); + buf.putLong (byteDepth); + buf.putLong (byteDepthLow); + buf.putLong (byteDepthHigh); + buf.putLongLong (enqueueTxStarts); + buf.putLongLong (enqueueTxCommits); + buf.putLongLong (enqueueTxRejects); + buf.putLong (enqueueTxCount); + buf.putLong (enqueueTxCountLow); + buf.putLong (enqueueTxCountHigh); + buf.putLongLong (dequeueTxStarts); + buf.putLongLong (dequeueTxCommits); + buf.putLongLong (dequeueTxRejects); + buf.putLong (dequeueTxCount); + buf.putLong (dequeueTxCountLow); + buf.putLong (dequeueTxCountHigh); + buf.putLong (consumers); + buf.putLong (consumersLow); + buf.putLong (consumersHigh); + + msgDepthLow = msgDepth; + msgDepthHigh = msgDepth; + byteDepthLow = byteDepth; + byteDepthHigh = byteDepth; + enqueueTxCountLow = enqueueTxCount; + enqueueTxCountHigh = enqueueTxCount; + dequeueTxCountLow = dequeueTxCount; + dequeueTxCountHigh = dequeueTxCount; + consumersLow = consumers; + consumersHigh = consumers; +} Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h Wed Oct 31 13:06:05 2007 @@ -0,0 +1,181 @@ +#ifndef _ManagementObjectQueue_ +#define _ManagementObjectQueue_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" + +namespace qpid { +namespace broker { + +const uint32_t MSG_MASK_TX = 1; // Transactional message +const uint32_t MSG_MASK_PERSIST = 2; // Persistent message + +class ManagementObjectQueue : public ManagementObject +{ + private: + + static bool schemaNeeded; + + std::string objectName; + std::string vhostName; + std::string name; + bool durable; + bool autoDelete; + + uint64_t msgTotalEnqueues; // Total messages enqueued + uint64_t msgTotalDequeues; // Total messages dequeued + uint64_t msgTxEnqueues; // Transactional messages enqueued + uint64_t msgTxDequeues; // Transactional messages dequeued + uint64_t msgPersistEnqueues; // Persistent messages enqueued + uint64_t msgPersistDequeues; // Persistent messages dequeued + + uint32_t msgDepth; // Current size of queue in messages + uint32_t msgDepthLow; // Low-water queue size, this interval + uint32_t msgDepthHigh; // High-water queue size, this interval + + uint64_t byteTotalEnqueues; // Total messages enqueued + uint64_t byteTotalDequeues; // Total messages dequeued + uint64_t byteTxEnqueues; // Transactional messages enqueued + uint64_t byteTxDequeues; // Transactional messages dequeued + uint64_t bytePersistEnqueues; // Persistent messages enqueued + uint64_t bytePersistDequeues; // Persistent messages dequeued + + uint32_t byteDepth; // Current size of queue in bytes + uint32_t byteDepthLow; // Low-water mark this interval + uint32_t byteDepthHigh; // High-water mark this interval + + uint64_t enqueueTxStarts; // Total enqueue transactions started + uint64_t enqueueTxCommits; // Total enqueue transactions committed + uint64_t enqueueTxRejects; // Total enqueue transactions rejected + + uint32_t enqueueTxCount; // Current pending enqueue transactions + uint32_t enqueueTxCountLow; // Low water mark this interval + uint32_t enqueueTxCountHigh; // High water mark this interval + + uint64_t dequeueTxStarts; // Total dequeue transactions started + uint64_t dequeueTxCommits; // Total dequeue transactions committed + uint64_t dequeueTxRejects; // Total dequeue transactions rejected + + uint32_t dequeueTxCount; // Current pending dequeue transactions + uint32_t dequeueTxCountLow; // Low water mark this interval + uint32_t dequeueTxCountHigh; // High water mark this interval + + uint32_t consumers; // Current consumers on queue + uint32_t consumersLow; // Low water mark this interval + uint32_t consumersHigh; // High water mark this interval + + uint16_t getObjectType (void) { return OBJECT_QUEUE; } + std::string getObjectName (void) { return objectName; } + void writeSchema (Buffer& buf); + void writeConfig (Buffer& buf); + void writeInstrumentation (Buffer& buf); + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + + inline void adjustQueueHiLo (void){ + if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth; + if (msgDepth < msgDepthLow) msgDepthLow = msgDepth; + + if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth; + if (byteDepth < byteDepthLow) byteDepthLow = byteDepth; + instChanged = true; + } + + inline void adjustTxHiLo (void){ + if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount; + if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount; + if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount; + if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount; + instChanged = true; + } + + inline void adjustConsumerHiLo (void){ + if (consumers > consumersHigh) consumersHigh = consumers; + if (consumers < consumersLow) consumersLow = consumers; + instChanged = true; + } + + public: + + typedef boost::shared_ptr shared_ptr; + + ManagementObjectQueue (std::string& name, bool durable, bool autoDelete); + ~ManagementObjectQueue (void); + + // The following mask contents are used to describe enqueued or dequeued + // messages when counting statistics. + + inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){ + msgTotalEnqueues++; + byteTotalEnqueues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxEnqueues++; + byteTxEnqueues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistEnqueues++; + bytePersistEnqueues += bytes; + } + + msgDepth++; + byteDepth += bytes; + adjustQueueHiLo (); + } + + inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){ + msgTotalDequeues++; + byteTotalDequeues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxDequeues++; + byteTxDequeues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistDequeues++; + bytePersistDequeues += bytes; + } + + msgDepth--; + byteDepth -= bytes; + adjustQueueHiLo (); + } + + inline void incConsumers (void){ + consumers++; + adjustConsumerHiLo (); + } + + inline void decConsumers (void){ + consumers--; + adjustConsumerHiLo (); + } +}; + +}} + + + +#endif /*!_ManagementObjectQueue_*/ Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp Wed Oct 31 13:06:05 2007 @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Broker.h" +#include "ManagementObjectVhost.h" + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; + +bool ManagementObjectVhost::schemaNeeded = true; + +ManagementObjectVhost::ManagementObjectVhost (const Options& /*_conf*/) +{ + name = "/"; +} + +ManagementObjectVhost::~ManagementObjectVhost () {} + +void ManagementObjectVhost::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaItem (buf, TYPE_STRING, "name", "Name of virtual host", true, true); + + schemaListEnd (buf); +} + +void ManagementObjectVhost::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putShortString (name); +} + Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h?rev=590806&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h Wed Oct 31 13:06:05 2007 @@ -0,0 +1,63 @@ +#ifndef _ManagementObjectVhost_ +#define _ManagementObjectVhost_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" +#include "qpid/Options.h" +#include "boost/shared_ptr.hpp" + +namespace qpid { +namespace broker { + +class ManagementObjectVhost : public ManagementObject +{ + public: + + typedef boost::shared_ptr shared_ptr; + + ManagementObjectVhost (const Options& conf); + ~ManagementObjectVhost (void); + + private: + + static bool schemaNeeded; + + std::string objectName; + + std::string name; + + uint16_t getObjectType (void) { return OBJECT_VHOST; } + std::string getObjectName (void) { return objectName; } + void writeSchema (Buffer& buf); + void writeConfig (Buffer& buf); + void writeInstrumentation (Buffer& /*buf*/) {} + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + + inline bool getInstChanged (void) { return false; } +}; + +}} + + +#endif /*!_ManagementObjectVhost_*/