qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cctriel...@apache.org
Subject svn commit: r586578 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/framing/
Date Fri, 19 Oct 2007 18:57:31 GMT
Author: cctrieloff
Date: Fri Oct 19 11:57:30 2007
New Revision: 586578

URL: http://svn.apache.org/viewvc?rev=586578&view=rev
Log:

QPID-651 applied patch from Ted


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=586578&r1=586577&r2=586578&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Oct 19 11:57:30 2007
@@ -161,6 +161,9 @@
   qpid/broker/FanOutExchange.cpp \
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/IncomingExecutionContext.cpp \
+  qpid/broker/ManagementAgent.cpp \
+  qpid/broker/ManagementObject.cpp \
+  qpid/broker/ManagementObjectQueue.cpp \
   qpid/broker/Message.cpp \
   qpid/broker/MessageAdapter.cpp \
   qpid/broker/MessageBuilder.cpp \
@@ -254,6 +257,9 @@
   qpid/broker/HandlerImpl.h \
   qpid/broker/HeadersExchange.h \
   qpid/broker/IncomingExecutionContext.h \
+  qpid/broker/ManagementAgent.h \
+  qpid/broker/ManagementObject.h \
+  qpid/broker/ManagementObjectQueue.h \
   qpid/broker/Message.h \
   qpid/broker/MessageAdapter.h \
   qpid/broker/MessageBuilder.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=586578&r1=586577&r2=586578&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Oct 19 11:57:30 2007
@@ -60,8 +60,10 @@
     connectionBacklog(10),
     store(),
     stagingThreshold(5000000),
-	storeDir("/var"),
-	storeAsync(false)
+    storeDir("/var"),
+    storeAsync(false),
+    enableMgmt(0),
+    mgmtPubInterval(10)
 {
     addOptions()
         ("port,p", optValue(port,"PORT"),
@@ -79,7 +81,11 @@
         ("store-directory", optValue(storeDir,"DIR"),
          "Store directory location for persistence.")
         ("store-async", optValue(storeAsync,"yes|no"),
-         "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.");
+         "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.")
+        ("mgmt,m", optValue(enableMgmt,"yes|no"),
+         "Enable Management")
+        ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
+         "Management Publish Interval");
 }
 
 const std::string empty;
@@ -87,6 +93,7 @@
 const std::string amq_topic("amq.topic");
 const std::string amq_fanout("amq.fanout");
 const std::string amq_match("amq.match");
+const std::string qpid_management("qpid.management");
 
 Broker::Broker(const Broker::Options& conf) :
     config(conf),
@@ -96,11 +103,24 @@
     factory(*this),
     dtxManager(store.get())
 {
+    if(conf.enableMgmt){
+	managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval));
+	queues.setManagementAgent(managementAgent);
+    }
+
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
     exchanges.declare(amq_direct, DirectExchange::typeName);
     exchanges.declare(amq_topic, TopicExchange::typeName);
     exchanges.declare(amq_fanout, FanOutExchange::typeName);
     exchanges.declare(amq_match, HeadersExchange::typeName);
+    
+    if(conf.enableMgmt) {
+    	QPID_LOG(info, "Management enabled");
+        exchanges.declare(qpid_management, TopicExchange::typeName);
+        managementAgent->setExchange (exchanges.get (qpid_management));
+    }
+    else
+    	QPID_LOG(info, "Management not enabled");
 
     if(store.get()) {
 		store->init(conf.storeDir, conf.storeAsync);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=586578&r1=586577&r2=586578&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Oct 19 11:57:30 2007
@@ -30,6 +30,7 @@
 #include "MessageStore.h"
 #include "QueueRegistry.h"
 #include "SessionManager.h"
+#include "ManagementAgent.h"
 #include "qpid/Options.h"
 #include "qpid/Plugin.h"
 #include "qpid/Url.h"
@@ -64,8 +65,10 @@
         int connectionBacklog;
         std::string store;      
         long stagingThreshold;
-		string storeDir;
-		bool storeAsync;
+	string storeDir;
+	bool storeAsync;
+	bool enableMgmt;
+	uint16_t mgmtPubInterval;
     };
     
     virtual ~Broker();
@@ -107,6 +110,7 @@
     DtxManager& getDtxManager() { return dtxManager; }
 
     SessionManager& getSessionManager() { return sessionManager; }
+    ManagementAgent::shared_ptr getManagementAgent() { return managementAgent; }
     
   private:
     sys::Acceptor& getAcceptor() const;
@@ -123,6 +127,7 @@
     DtxManager dtxManager;
     HandlerUpdaters handlerUpdaters;
     SessionManager sessionManager;
+    ManagementAgent::shared_ptr managementAgent;
 
     static MessageStore* createStore(const Options& config);
 };

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp?rev=586578&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp Fri Oct 19 11:57:30
2007
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+#include "ManagementAgent.h"
+#include "DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageDelivery.h>
+#include <qpid/framing/AMQFrame.h>
+
+using namespace qpid::framing;
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
+{
+    timer.add (TimerTask::shared_ptr (new Periodic(*this, interval)));
+}
+
+void ManagementAgent::setExchange (Exchange::shared_ptr exchangePtr)
+{
+    exchange = exchangePtr;
+}
+
+void ManagementAgent::addObject (ManagementObject::shared_ptr object)
+{
+    managementObjects.push_back (object);
+    QPID_LOG(info, "Management Object Added");
+}
+
+void ManagementAgent::deleteObject (ManagementObject::shared_ptr object)
+{
+    managementObjects.remove (object);
+    QPID_LOG (debug, "Management Object Removed");
+}
+
+ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
+    : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
+
+void ManagementAgent::Periodic::fire ()
+{
+    agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval)));
+    agent.PeriodicProcessing ();
+}
+
+void ManagementAgent::PeriodicProcessing (void)
+{
+#define BUFSIZE   65536
+#define THRESHOLD 16384
+    char      msgChars[BUFSIZE];
+    Buffer    msgBuffer (msgChars, BUFSIZE);
+    uint32_t  contentSize;
+
+    //QPID_LOG (debug, "Timer Fired");
+    if (managementObjects.empty ())
+	return;
+	
+    Message::shared_ptr msg (new Message ());
+
+    // Build the magic number for the management message.
+    msgBuffer.putOctet ('A');
+    msgBuffer.putOctet ('M');
+    msgBuffer.putOctet ('0');
+    msgBuffer.putOctet ('1');
+
+    for (ManagementObjectList::iterator iter = managementObjects.begin ();
+	 iter != managementObjects.end ();
+	 iter++)
+    {
+	ManagementObject::shared_ptr objectPtr = *iter;
+
+	//QPID_LOG (debug, "    Object Found...");
+	
+	if (objectPtr->getSchemaNeeded ())
+	{
+	    //QPID_LOG (debug, "        Generating Schema");
+	    uint32_t startAvail = msgBuffer.available ();
+	    uint32_t recordLength;
+	    
+	    msgBuffer.putOctet ('S');  // opcode = Schema Record
+	    msgBuffer.putOctet (0);    // content-class = N/A
+	    msgBuffer.putShort (objectPtr->getObjectType ());
+	    msgBuffer.record   (); // Record the position of the length field
+	    msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+	    objectPtr->writeSchema (msgBuffer);
+	    recordLength = startAvail - msgBuffer.available ();
+	    msgBuffer.restore (true);         // Restore pointer to length field
+	    msgBuffer.putLong (recordLength);
+	    msgBuffer.restore ();             // Re-restore to get to the end of the buffer
+	}
+
+	if (objectPtr->getConfigChanged ())
+	{
+	    //QPID_LOG (debug, "        Generating Config");
+	    uint32_t startAvail = msgBuffer.available ();
+	    uint32_t recordLength;
+	    
+	    msgBuffer.putOctet ('C');  // opcode = Content Record
+	    msgBuffer.putOctet ('C');  // content-class = Configuration
+	    msgBuffer.putShort (objectPtr->getObjectType ());
+	    msgBuffer.record   (); // Record the position of the length field
+	    msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+	    objectPtr->writeConfig (msgBuffer);
+	    recordLength = startAvail - msgBuffer.available ();
+	    msgBuffer.restore (true);         // Restore pointer to length field
+	    msgBuffer.putLong (recordLength);
+	    msgBuffer.restore ();             // Re-restore to get to the end of the buffer
+	}
+	
+	if (objectPtr->getInstChanged ())
+	{
+	    //QPID_LOG (debug, "        Generating Instrumentation");
+	    uint32_t startAvail = msgBuffer.available ();
+	    uint32_t recordLength;
+	    
+	    msgBuffer.putOctet ('C');  // opcode = Content Record
+	    msgBuffer.putOctet ('I');  // content-class = Instrumentation
+	    msgBuffer.putShort (objectPtr->getObjectType ());
+	    msgBuffer.record   (); // Record the position of the length field
+	    msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+	    objectPtr->writeInstrumentation (msgBuffer);
+	    recordLength = startAvail - msgBuffer.available ();
+	    msgBuffer.restore (true);         // Restore pointer to length field
+	    msgBuffer.putLong (recordLength);
+	    msgBuffer.restore ();             // Re-restore to get to the end of the buffer
+	}
+
+	// Temporary protection against buffer overrun.
+	// This needs to be replaced with frame fragmentation.
+	if (msgBuffer.available () < THRESHOLD)
+	    break;
+    }
+    
+    msgBuffer.putOctet ('X');  // End-of-message
+    msgBuffer.putOctet (0);
+    msgBuffer.putShort (0);
+    msgBuffer.putLong  (8);
+
+    contentSize = BUFSIZE - msgBuffer.available ();
+    msgBuffer.reset ();
+
+    AMQFrame method  (0, MessageTransferBody(ProtocolVersion(),
+				 	     0, "qpid.management", 0, 0));
+    AMQFrame header  (0, AMQHeaderBody());
+    AMQFrame content;
+
+    content.setBody(AMQContentBody());
+    content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize);
+
+    method.setEof  (false);
+    header.setBof  (false);
+    header.setEof  (false);
+    content.setBof (false);
+
+    msg->getFrames().append(method);
+    msg->getFrames().append(header);
+
+    MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+    props->setContentLength(contentSize);
+    //msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey("mgmt");
+    msg->getFrames().append(content);
+
+    DeliverableMessage deliverable (msg);
+    exchange->route (deliverable, "mgmt", 0);
+}
+

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h?rev=586578&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h Fri Oct 19 11:57:30 2007
@@ -0,0 +1,70 @@
+#ifndef _ManagementAgent_
+#define _ManagementAgent_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Options.h"
+#include "Exchange.h"
+#include "ManagementObject.h"
+#include "Timer.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid { 
+namespace broker {
+
+
+class ManagementAgent
+{
+  public:
+
+    typedef boost::shared_ptr<ManagementAgent> shared_ptr;
+
+    ManagementAgent(uint16_t interval);
+
+    void setExchange  (Exchange::shared_ptr exchangePtr);
+    void addObject    (ManagementObject::shared_ptr object);
+    void deleteObject (ManagementObject::shared_ptr object);
+    
+  private:
+
+    struct Periodic : public TimerTask
+    {
+        ManagementAgent& agent;
+	
+        Periodic (ManagementAgent& agent, uint32_t seconds);
+	~Periodic () {}
+    	void fire ();
+    };
+
+    ManagementObjectList managementObjects;
+    Timer                timer;
+    Exchange::shared_ptr exchange;
+    uint16_t             interval;
+
+    void PeriodicProcessing (void);
+};
+
+}}
+            
+
+
+#endif  /*!_ManagementAgent_*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp?rev=586578&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp Fri Oct 19 11:57:30
2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+#include "ManagementObject.h"
+
+using namespace qpid::framing;
+using namespace qpid::broker;
+
+void ManagementObject::schemaItem (Buffer&     buf,
+				   uint8_t     typeCode,
+				   std::string name,
+				   std::string description,
+				   bool        isConfig)
+{
+    buf.putOctet       (isConfig ? 1 : 0);
+    buf.putOctet       (typeCode);
+    buf.putShortString (name);
+    buf.putShortString (description);
+}
+
+void ManagementObject::schemaListEnd (Buffer& buf)
+{
+    buf.putOctet (0xFF);
+}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h?rev=586578&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h Fri Oct 19 11:57:30 2007
@@ -0,0 +1,92 @@
+#ifndef _ManagementObject_
+#define _ManagementObject_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+#include <qpid/framing/Buffer.h>
+#include <boost/shared_ptr.hpp>
+#include <list>
+
+namespace qpid { 
+namespace broker {
+
+using namespace qpid::framing;
+
+const uint16_t OBJECT_BROKER    = 1;
+const uint16_t OBJECT_SERVER    = 2;
+const uint16_t OBJECT_QUEUE     = 3;
+const uint16_t OBJECT_EXCHANGE  = 4;
+const uint16_t OBJECT_BINDING   = 5;
+
+class ManagementObject
+{
+  private:
+  
+    qpid::sys::AbsTime       createTime;
+    qpid::sys::AbsTime       destroyTime;
+
+  protected:
+    
+    bool  configChanged;
+    bool  instChanged;
+    
+    static const uint8_t TYPE_UINT8  = 1;
+    static const uint8_t TYPE_UINT16 = 2;
+    static const uint8_t TYPE_UINT32 = 3;
+    static const uint8_t TYPE_UINT64 = 4;
+    static const uint8_t TYPE_BOOL   = 5;
+    static const uint8_t TYPE_STRING = 6;
+    
+    void schemaItem (Buffer&     buf,
+		     uint8_t     typeCode,
+		     std::string name,
+		     std::string description,
+		     bool        isConfig = false);
+    void schemaListEnd (Buffer & buf);
+
+  public:
+    typedef boost::shared_ptr<ManagementObject> shared_ptr;
+
+    ManagementObject () : configChanged(true), instChanged(true) { createTime = qpid::sys::now
(); }
+    virtual ~ManagementObject () {}
+
+    virtual uint16_t    getObjectType        (void)        = 0;
+    virtual std::string getObjectName        (void)        = 0;
+    virtual void        writeSchema          (Buffer& buf) = 0;
+    virtual void        writeConfig          (Buffer& buf) = 0;
+    virtual void        writeInstrumentation (Buffer& buf) = 0;
+    virtual bool        getSchemaNeeded      (void)        = 0;
+    
+    inline bool getConfigChanged (void) { return configChanged; }
+    inline bool getInstChanged   (void) { return instChanged; }
+    inline void resourceDestroy  (void) { destroyTime = qpid::sys::now (); }
+
+};
+
+ typedef std::list<ManagementObject::shared_ptr> ManagementObjectList;
+
+}}
+            
+
+
+#endif  /*!_ManagementObject_*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp?rev=586578&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp Fri Oct 19 11:57:30
2007
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+#include "ManagementObjectQueue.h"
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+bool ManagementObjectQueue::schemaNeeded = true;
+
+ManagementObjectQueue::ManagementObjectQueue (std::string& _name, bool _durable, bool
_autoDelete) :
+    name(_name), durable(_durable), autoDelete(_autoDelete)
+{
+    msgTotalEnqueues     = 0;
+    msgTotalDequeues     = 0;
+    msgTxEnqueues        = 0;
+    msgTxDequeues        = 0;
+    msgPersistEnqueues   = 0;
+    msgPersistDequeues   = 0;
+
+    msgDepth             = 0;
+    msgDepthLow          = 0;
+    msgDepthHigh         = 0;
+
+    byteTotalEnqueues    = 0;
+    byteTotalDequeues    = 0;
+    byteTxEnqueues       = 0;
+    byteTxDequeues       = 0;
+    bytePersistEnqueues  = 0;
+    bytePersistDequeues  = 0;
+    
+    byteDepth            = 0;
+    byteDepthLow         = 0;
+    byteDepthHigh        = 0;
+
+    enqueueTxStarts      = 0;
+    enqueueTxCommits     = 0;
+    enqueueTxRejects     = 0;
+    dequeueTxStarts      = 0;
+    dequeueTxCommits     = 0;
+    dequeueTxRejects     = 0;
+    
+    enqueueTxCount       = 0;
+    enqueueTxCountLow    = 0;
+    enqueueTxCountHigh   = 0;
+
+    dequeueTxCount       = 0;
+    dequeueTxCountLow    = 0;
+    dequeueTxCountHigh   = 0;
+
+    consumers            = 0;
+    consumersLow         = 0;
+    consumersHigh        = 0;
+}
+
+ManagementObjectQueue::~ManagementObjectQueue () {}
+
+void ManagementObjectQueue::writeSchema (Buffer& buf)
+{
+    schemaItem (buf, TYPE_STRING, "name",                "Queue Name", true);
+    schemaItem (buf, TYPE_BOOL,   "durable",             "Durable",    true);
+    schemaItem (buf, TYPE_BOOL,   "autoDelete",          "AutoDelete", true);
+
+    schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues",    "Total messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "msgTotalDequeues",    "Total messages dequeued");
+    schemaItem (buf, TYPE_UINT64, "msgTxEnqueues",       "Transactional messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "msgTxDequeues",       "Transactional messages dequeued");
+    schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues",  "Persistent messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "msgPersistDequeues",  "Persistent messages dequeued");
+    schemaItem (buf, TYPE_UINT32, "msgDepth",            "Current size of queue in messages");
+    schemaItem (buf, TYPE_UINT32, "msgDepthLow",         "Low-water queue size, this interval");
+    schemaItem (buf, TYPE_UINT32, "msgDepthHigh",        "High-water queue size, this interval");
+    schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues",   "Total messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "byteTotalDequeues",   "Total messages dequeued");
+    schemaItem (buf, TYPE_UINT64, "byteTxEnqueues",      "Transactional messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "byteTxDequeues",      "Transactional messages dequeued");
+    schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued");
+    schemaItem (buf, TYPE_UINT32, "byteDepth",           "Current size of queue in bytes");
+    schemaItem (buf, TYPE_UINT32, "byteDepthLow",        "Low-water mark this interval");
+    schemaItem (buf, TYPE_UINT32, "byteDepthHigh",       "High-water mark this interval");
+    schemaItem (buf, TYPE_UINT64, "enqueueTxStarts",     "Total enqueue transactions started
");
+    schemaItem (buf, TYPE_UINT64, "enqueueTxCommits",    "Total enqueue transactions committed");
+    schemaItem (buf, TYPE_UINT64, "enqueueTxRejects",    "Total enqueue transactions rejected");
+    schemaItem (buf, TYPE_UINT32, "enqueueTxCount",      "Current pending enqueue transactions");
+    schemaItem (buf, TYPE_UINT32, "enqueueTxCountLow",   "Low water mark this interval");
+    schemaItem (buf, TYPE_UINT32, "enqueueTxCountHigh",  "High water mark this interval");
+    schemaItem (buf, TYPE_UINT64, "dequeueTxStarts",     "Total dequeue transactions started
");
+    schemaItem (buf, TYPE_UINT64, "dequeueTxCommits",    "Total dequeue transactions committed");
+    schemaItem (buf, TYPE_UINT64, "dequeueTxRejects",    "Total dequeue transactions rejected");
+    schemaItem (buf, TYPE_UINT32, "dequeueTxCount",      "Current pending dequeue transactions");
+    schemaItem (buf, TYPE_UINT32, "dequeueTxCountLow",   "Transaction low water mark this
interval");
+    schemaItem (buf, TYPE_UINT32, "dequeueTxCountHigh",  "Transaction high water mark this
interval");
+    schemaItem (buf, TYPE_UINT32, "consumers",           "Current consumers on queue");
+    schemaItem (buf, TYPE_UINT32, "consumersLow",        "Consumer low water mark this interval");
+    schemaItem (buf, TYPE_UINT32, "consumersHigh",       "Consumer high water mark this interval");
+
+    schemaListEnd (buf);
+
+    schemaNeeded = false;
+}
+
+void ManagementObjectQueue::writeConfig (Buffer& buf)
+{
+    buf.putShortString (name);
+    buf.putOctet       (durable    ? 1 : 0);
+    buf.putOctet       (autoDelete ? 1 : 0);
+    
+    configChanged = false;
+}
+
+void ManagementObjectQueue::writeInstrumentation (Buffer& buf)
+{
+    buf.putLongLong (msgTotalEnqueues);
+    buf.putLongLong (msgTotalDequeues);
+    buf.putLongLong (msgTxEnqueues);
+    buf.putLongLong (msgTxDequeues);
+    buf.putLongLong (msgPersistEnqueues);
+    buf.putLongLong (msgPersistDequeues);
+    buf.putLong     (msgDepth);
+    buf.putLong     (msgDepthLow);
+    buf.putLong     (msgDepthHigh);
+    buf.putLongLong (byteTotalEnqueues);
+    buf.putLongLong (byteTotalDequeues);
+    buf.putLongLong (byteTxEnqueues);
+    buf.putLongLong (byteTxDequeues);
+    buf.putLongLong (bytePersistEnqueues);
+    buf.putLongLong (bytePersistDequeues);
+    buf.putLong     (byteDepth);
+    buf.putLong     (byteDepthLow);
+    buf.putLong     (byteDepthHigh);
+    buf.putLongLong (enqueueTxStarts);
+    buf.putLongLong (enqueueTxCommits);
+    buf.putLongLong (enqueueTxRejects);
+    buf.putLong     (enqueueTxCount);
+    buf.putLong     (enqueueTxCountLow);
+    buf.putLong     (enqueueTxCountHigh);
+    buf.putLongLong (dequeueTxStarts);
+    buf.putLongLong (dequeueTxCommits);
+    buf.putLongLong (dequeueTxRejects);
+    buf.putLong     (dequeueTxCount);
+    buf.putLong     (dequeueTxCountLow);
+    buf.putLong     (dequeueTxCountHigh);
+    buf.putLong     (consumers);
+    buf.putLong     (consumersLow);
+    buf.putLong     (consumersHigh);
+
+    instChanged = false;
+}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h?rev=586578&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h Fri Oct 19 11:57:30
2007
@@ -0,0 +1,179 @@
+#ifndef _ManagementObjectQueue_
+#define _ManagementObjectQueue_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+
+namespace qpid { 
+namespace broker {
+
+const uint32_t MSG_MASK_TX      = 1;  // Transactional message
+const uint32_t MSG_MASK_PERSIST = 2;  // Persistent message
+
+class ManagementObjectQueue : public ManagementObject
+{
+  private:
+
+    static bool schemaNeeded;
+
+    std::string objectName;
+    std::string name;
+    bool        durable;
+    bool        autoDelete;
+  
+    uint64_t  msgTotalEnqueues;     // Total messages enqueued
+    uint64_t  msgTotalDequeues;     // Total messages dequeued
+    uint64_t  msgTxEnqueues;        // Transactional messages enqueued
+    uint64_t  msgTxDequeues;        // Transactional messages dequeued
+    uint64_t  msgPersistEnqueues;   // Persistent messages enqueued
+    uint64_t  msgPersistDequeues;   // Persistent messages dequeued
+    
+    uint32_t  msgDepth;             // Current size of queue in messages
+    uint32_t  msgDepthLow;          // Low-water queue size, this interval
+    uint32_t  msgDepthHigh;         // High-water queue size, this interval
+
+    uint64_t  byteTotalEnqueues;    // Total messages enqueued
+    uint64_t  byteTotalDequeues;    // Total messages dequeued
+    uint64_t  byteTxEnqueues;       // Transactional messages enqueued
+    uint64_t  byteTxDequeues;       // Transactional messages dequeued
+    uint64_t  bytePersistEnqueues;  // Persistent messages enqueued
+    uint64_t  bytePersistDequeues;  // Persistent messages dequeued
+
+    uint32_t  byteDepth;            // Current size of queue in bytes
+    uint32_t  byteDepthLow;         // Low-water mark this interval
+    uint32_t  byteDepthHigh;        // High-water mark this interval
+    
+    uint64_t  enqueueTxStarts;      // Total enqueue transactions started 
+    uint64_t  enqueueTxCommits;     // Total enqueue transactions committed
+    uint64_t  enqueueTxRejects;     // Total enqueue transactions rejected
+    
+    uint32_t  enqueueTxCount;       // Current pending enqueue transactions
+    uint32_t  enqueueTxCountLow;    // Low water mark this interval
+    uint32_t  enqueueTxCountHigh;   // High water mark this interval
+    
+    uint64_t  dequeueTxStarts;      // Total dequeue transactions started 
+    uint64_t  dequeueTxCommits;     // Total dequeue transactions committed
+    uint64_t  dequeueTxRejects;     // Total dequeue transactions rejected
+    
+    uint32_t  dequeueTxCount;       // Current pending dequeue transactions
+    uint32_t  dequeueTxCountLow;    // Low water mark this interval
+    uint32_t  dequeueTxCountHigh;   // High water mark this interval
+    
+    uint32_t  consumers;            // Current consumers on queue
+    uint32_t  consumersLow;         // Low water mark this interval
+    uint32_t  consumersHigh;        // High water mark this interval
+
+    uint16_t    getObjectType        (void) { return OBJECT_QUEUE; }
+    std::string getObjectName        (void) { return objectName; }
+    void        writeSchema          (Buffer& buf);
+    void        writeConfig          (Buffer& buf);
+    void        writeInstrumentation (Buffer& buf);
+    bool        getSchemaNeeded      (void) { return schemaNeeded; }
+    
+    inline void adjustQueueHiLo (void){
+	if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
+	if (msgDepth < msgDepthLow)  msgDepthLow  = msgDepth;
+
+	if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth;
+	if (byteDepth < byteDepthLow)  byteDepthLow  = byteDepth;
+	instChanged = true;
+    }
+    
+    inline void adjustTxHiLo (void){
+    	if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount;
+	if (enqueueTxCount < enqueueTxCountLow)  enqueueTxCountLow  = enqueueTxCount;
+    	if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount;
+	if (dequeueTxCount < dequeueTxCountLow)  dequeueTxCountLow  = dequeueTxCount;
+	instChanged = true;
+    }
+    
+    inline void adjustConsumerHiLo (void){
+    	if (consumers > consumersHigh) consumersHigh = consumers;
+	if (consumers < consumersLow)  consumersLow  = consumers;
+	instChanged = true;
+    }
+
+  public:
+
+    typedef boost::shared_ptr<ManagementObjectQueue> shared_ptr;
+
+    ManagementObjectQueue  (std::string& name, bool durable, bool autoDelete);
+    ~ManagementObjectQueue (void);
+
+    // The following mask contents are used to describe enqueued or dequeued
+    // messages when counting statistics.
+
+    inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){
+    	msgTotalEnqueues++;
+	byteTotalEnqueues += bytes;
+	
+	if (attrMask & MSG_MASK_TX){
+	    msgTxEnqueues++;
+	    byteTxEnqueues += bytes;
+	}
+	
+	if (attrMask & MSG_MASK_PERSIST){
+	    msgPersistEnqueues++;
+	    bytePersistEnqueues += bytes;
+	}
+
+	msgDepth++;
+	byteDepth += bytes;
+	adjustQueueHiLo ();
+    }
+    
+    inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){
+    	msgTotalDequeues++;
+	byteTotalDequeues += bytes;
+
+	if (attrMask & MSG_MASK_TX){
+	    msgTxDequeues++;
+	    byteTxDequeues += bytes;
+	}
+	
+	if (attrMask & MSG_MASK_PERSIST){
+	    msgPersistDequeues++;
+	    bytePersistDequeues += bytes;
+	}
+
+	msgDepth--;
+	byteDepth -= bytes;
+	adjustQueueHiLo ();
+    }
+    
+    inline void incConsumers (void){
+    	consumers++;
+	adjustConsumerHiLo ();
+    }
+    
+    inline void decConsumers (void){
+    	consumers--;
+	adjustConsumerHiLo ();
+    }
+};
+
+}}
+            
+
+
+#endif  /*!_ManagementObjectQueue_*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=586578&r1=586577&r2=586578&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Oct 19 11:57:30 2007
@@ -77,7 +77,11 @@
         if (!enqueue(0, msg)){
             push(msg);
 	    msg->enqueueComplete();
+	    if (mgmtObjectPtr != 0)
+	        mgmtObjectPtr->enqueue (msg->contentSize ());
 	}else {
+	    if (mgmtObjectPtr != 0)
+	        mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
             push(msg);
 	}
 	QPID_LOG(debug, "Message " << msg << " enqueued on " << name <<
"[" << this << "]");
@@ -89,6 +93,8 @@
 void Queue::recover(Message::shared_ptr& msg){
     push(msg);
     msg->enqueueComplete(); // mark the message as enqueued
+    if (mgmtObjectPtr != 0)
+        mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
     if (store && !msg->isContentLoaded()) {
         //content has not been loaded, need to ensure that lazy loading mode is set:
         //TODO: find a nicer way to do this
@@ -97,8 +103,15 @@
 }
 
 void Queue::process(Message::shared_ptr& msg){
- 
+
+    uint32_t mask = MSG_MASK_TX;
+
+    if (msg->isPersistent ())
+        mask |= MSG_MASK_PERSIST;
+
     push(msg);
+    if (mgmtObjectPtr != 0)
+        mgmtObjectPtr->enqueue (msg->contentSize (), mask);
     serializer.execute(dispatchCallback);
    
 }
@@ -267,6 +280,14 @@
     if(!messages.empty()){
         msg = messages.front();
         pop();
+	if (mgmtObjectPtr != 0){
+	    uint32_t mask = 0;
+
+	    if (msg.payload->isPersistent ())
+	        mask |= MSG_MASK_PERSIST;
+
+	    mgmtObjectPtr->dequeue (msg.payload->contentSize (), mask);
+	}
     }
     return msg;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=586578&r1=586577&r2=586578&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Oct 19 11:57:30 2007
@@ -35,6 +35,7 @@
 #include "PersistableQueue.h"
 #include "QueuePolicy.h"
 #include "QueueBindings.h"
+#include "ManagementObjectQueue.h"
 
 namespace qpid {
     namespace broker {
@@ -93,6 +94,7 @@
             qpid::sys::Serializer<DispatchFunctor> serializer;
             DispatchFunctor dispatchCallback;
             framing::SequenceNumber sequence;
+	    ManagementObjectQueue::shared_ptr mgmtObjectPtr;
 
             void pop();
             void push(Message::shared_ptr& msg);
@@ -130,6 +132,8 @@
             void destroy();
             void bound(const string& exchange, const string& key, const qpid::framing::FieldTable&
args);
             void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref);
+            void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObjectPtr = mgmt;
}
+            ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObjectPtr; }
 
             bool acquire(const QueuedMessage& msg);
 
@@ -158,7 +162,7 @@
              * Request dispatch any queued messages providing there are
              * consumers for them. Only one thread can be dispatching
              * at any time, so this call schedules the despatch based on
-	     * the serilizer policy.
+	         * the serilizer policy.
              */
             void requestDispatch(Consumer::ptr c = Consumer::ptr());
             void flush(DispatchCompletion& callback);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=586578&r1=586577&r2=586578&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Fri Oct 19 11:57:30 2007
@@ -19,13 +19,16 @@
  *
  */
 #include "QueueRegistry.h"
+#include "ManagementAgent.h"
+#include "ManagementObjectQueue.h"
 #include <sstream>
 #include <assert.h>
 
 using namespace qpid::broker;
 using namespace qpid::sys;
 
-QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
+QueueRegistry::QueueRegistry(MessageStore* const _store) :
+    counter(1), store(_store) {}
 
 QueueRegistry::~QueueRegistry(){}
 
@@ -37,9 +40,18 @@
     string name = declareName.empty() ? generateName() : declareName;
     assert(!name.empty());
     QueueMap::iterator i =  queues.find(name);
+
     if (i == queues.end()) {
 	Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
 	queues[name] = queue;
+
+	if (managementAgent){
+	    ManagementObjectQueue::shared_ptr mgmtObject(new ManagementObjectQueue (name, durable,
autoDelete));
+
+	    queue->setMgmt (mgmtObject);
+	    managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject));
+	}
+	
 	return std::pair<Queue::shared_ptr, bool>(queue, true);
     } else {
 	return std::pair<Queue::shared_ptr, bool>(i->second, false);
@@ -48,12 +60,24 @@
 
 void QueueRegistry::destroy(const string& name){
     RWlock::ScopedWlock locker(lock);
+
+    if (managementAgent){
+	ManagementObjectQueue::shared_ptr mgmtObject;
+	QueueMap::iterator i = queues.find(name);
+
+	if (i != queues.end()){
+	    mgmtObject = i->second->getMgmt ();
+	    managementAgent->deleteObject (dynamic_pointer_cast<ManagementObject>(mgmtObject));
+	}
+    }
+
     queues.erase(name);
 }
 
 Queue::shared_ptr QueueRegistry::find(const string& name){
     RWlock::ScopedRlock locker(lock);
     QueueMap::iterator i = queues.find(name);
+    
     if (i == queues.end()) {
 	return Queue::shared_ptr();
     } else {
@@ -76,3 +100,14 @@
 MessageStore* const QueueRegistry::getStore() const {
     return store;
 }
+
+void QueueRegistry::setManagementAgent (ManagementAgent::shared_ptr agent)
+{
+    managementAgent = agent;
+}
+
+ManagementAgent::shared_ptr QueueRegistry::getManagementAgent (void)
+{
+    return managementAgent;
+}
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=586578&r1=586577&r2=586578&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Fri Oct 19 11:57:30 2007
@@ -24,6 +24,7 @@
 #include <map>
 #include "qpid/sys/Mutex.h"
 #include "Queue.h"
+#include "ManagementAgent.h"
 
 namespace qpid {
 namespace broker {
@@ -87,6 +88,12 @@
      * Return the message store used.
      */
     MessageStore* const getStore() const;
+    
+    /**
+     * Set/Get the ManagementAgent in use.
+     */
+    void setManagementAgent (ManagementAgent::shared_ptr agent);
+    ManagementAgent::shared_ptr getManagementAgent (void);
 
 private:
     typedef std::map<string, Queue::shared_ptr> QueueMap;
@@ -94,6 +101,7 @@
     qpid::sys::RWlock lock;
     int counter;
     MessageStore* const store;
+    ManagementAgent::shared_ptr managementAgent;
 };
 
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp?rev=586578&r1=586577&r2=586578&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp Fri Oct 19 11:57:30 2007
@@ -34,8 +34,17 @@
     r_position = position;
 }
 
-void Buffer::restore(){
+void Buffer::restore(bool reRecord){
+    uint32_t savedPosition = position;
+
     position = r_position;
+
+    if (reRecord)
+	r_position = savedPosition;
+}
+
+void Buffer::reset(){
+    position = 0;
 }
 
 uint32_t Buffer::available(){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h?rev=586578&r1=586577&r2=586578&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h Fri Oct 19 11:57:30 2007
@@ -41,7 +41,8 @@
     Buffer(char* data, uint32_t size);
 
     void record();
-    void restore();
+    void restore(bool reRecord = false);
+    void reset();
     uint32_t available();
     
     void putOctet(uint8_t i);



Mime
View raw message