qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cctriel...@apache.org
Subject svn commit: r608135 - in /incubator/qpid/trunk/qpid: cpp/managementgen/ cpp/src/qpid/broker/ cpp/src/qpid/management/ cpp/src/qpid/sys/ python/mgmt-cli/ python/qpid/ specs/
Date Wed, 02 Jan 2008 15:56:22 GMT
Author: cctrieloff
Date: Wed Jan  2 07:56:20 2008
New Revision: 608135

URL: http://svn.apache.org/viewvc?rev=608135&view=rev
Log:
patch-715  (tross)


Modified:
    incubator/qpid/trunk/qpid/cpp/managementgen/main.py
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
    incubator/qpid/trunk/qpid/python/mgmt-cli/main.py
    incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py
    incubator/qpid/trunk/qpid/python/qpid/management.py
    incubator/qpid/trunk/qpid/specs/management-schema.xml
    incubator/qpid/trunk/qpid/specs/management-types.xml

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/main.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/main.py?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/main.py (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/main.py Wed Jan  2 07:56:20 2008
@@ -41,8 +41,7 @@
 if opts.outdir      == None or \
    opts.typefile    == None or \
    opts.schemafile  == None or \
-   opts.templatedir == None or \
-   opts.makefile    == None:
+   opts.templatedir == None:
   parser.error ("Incorrect options, see --help for help")
 
 gen    = Generator     (opts.outdir,   opts.templatedir)
@@ -51,4 +50,6 @@
 gen.makeClassFiles  ("Class.h",   schema)
 gen.makeClassFiles  ("Class.cpp", schema)
 gen.makeMethodFiles ("Args.h",    schema)
-gen.makeMakeFile    (opts.makefile)
+
+if opts.makefile != None:
+  gen.makeMakeFile (opts.makefile)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Jan  2 07:56:20 2008
@@ -231,6 +231,7 @@
 void Broker::shutdown() {
     if (acceptor)
         acceptor->shutdown();
+    ManagementAgent::shutdown ();
 }
 
 Broker::~Broker() {
@@ -268,6 +269,11 @@
 ManagementObject::shared_ptr Broker::GetManagementObject(void) const
 {
     return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable* Broker::GetVhostObject(void) const
+{
+    return vhostObject.get();
 }
 
 Manageable::status_t Broker::ManagementMethod (uint32_t methodId,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Jan  2 07:56:20 2008
@@ -119,6 +119,7 @@
     SessionManager& getSessionManager() { return sessionManager; }
 
     management::ManagementObject::shared_ptr GetManagementObject (void) const;
+    management::Manageable*                  GetVhostObject      (void) const;
     management::Manageable::status_t
         ManagementMethod (uint32_t methodId, management::Args& args);
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Wed Jan  2 07:56:20 2008
@@ -26,6 +26,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/ptr_map.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/management/ManagementAgent.h"
 
 #include <boost/bind.hpp>
 
@@ -38,11 +39,15 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 using namespace qpid::ptr_map;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
 
 namespace qpid {
 namespace broker {
 
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
+    Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const Socket& s) :
     broker(broker_),
     outputTasks(*out_),
     out(out_),
@@ -50,15 +55,45 @@
     heartbeat(0),
     client(0),
     stagingThreshold(broker.getStagingThreshold()),
-    adapter(*this)
-{}
+    adapter(*this),
+    mgmtClosing(0)
+{
+    Manageable* parent = broker.GetVhostObject ();
+
+    if (parent != 0)
+    {
+        ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+        if (agent.get () != 0)
+        {
+            mgmtObject = management::Client::shared_ptr
+                (new management::Client (this, parent, s.getPeerAddress ()));
+            agent->addObject (mgmtObject);
+        }
+    }
+}
+
+Connection::~Connection ()
+{
+    if (mgmtObject.get () != 0)
+        mgmtObject->resourceDestroy ();
+}
 
 void Connection::received(framing::AMQFrame& frame){
+    if (mgmtClosing)
+        close (403, "Closed by Management Request", 0, 0);
+
     if (frame.getChannel() == 0) {
         adapter.handle(frame);
     } else {
         getChannel(frame.getChannel()).in(frame);
     }
+
+    if (mgmtObject.get () != 0)
+    {
+        mgmtObject->inc_framesFromClient ();
+        mgmtObject->inc_bytesFromClient (frame.size ());
+    }
 }
 
 void Connection::close(
@@ -121,6 +156,31 @@
     }
     return *get_pointer(i);
 }
+
+ManagementObject::shared_ptr Connection::GetManagementObject (void) const
+{
+    return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
+                                                   Args&    /*args*/)
+{
+    Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+    QPID_LOG (debug, "Connection::ManagementMethod [id=" << methodId << "]");
+
+    switch (methodId)
+    {
+    case management::Client::METHOD_CLOSE :
+        mgmtClosing = 1;
+        mgmtObject->set_closing (1);
+        status = Manageable::STATUS_OK;
+        break;
+    }
+
+    return status;
+}
+
 
 }}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Wed Jan  2 07:56:20 2008
@@ -35,9 +35,12 @@
 #include "qpid/sys/TimeoutHandler.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "Broker.h"
+#include "qpid/sys/Socket.h"
 #include "qpid/Exception.h"
 #include "ConnectionHandler.h"
 #include "SessionHandler.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Client.h"
 
 #include <boost/ptr_container/ptr_map.hpp>
 
@@ -45,10 +48,12 @@
 namespace broker {
 
 class Connection : public sys::ConnectionInputHandler, 
-                   public ConnectionToken
+                   public ConnectionToken,
+                   public management::Manageable
 {
   public:
-    Connection(sys::ConnectionOutputHandler* out, Broker& broker);
+    Connection(sys::ConnectionOutputHandler* out, Broker& broker, const sys::Socket& s);
+    ~Connection ();
 
     /** Get the SessionHandler for channel. Create if it does not already exist */
     SessionHandler& getChannel(framing::ChannelId channel);
@@ -85,6 +90,11 @@
 
     void closeChannel(framing::ChannelId channel);
 
+    // Manageable entry points
+    management::ManagementObject::shared_ptr GetManagementObject (void) const;
+    management::Manageable::status_t
+        ManagementMethod (uint32_t methodId, management::Args& args);
+
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -97,6 +107,8 @@
     framing::AMQP_ClientProxy::Connection* client;
     uint64_t stagingThreshold;
     ConnectionHandler adapter;
+    management::Client::shared_ptr mgmtObject;
+    bool mgmtClosing;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Wed Jan  2 07:56:20 2008
@@ -35,9 +35,10 @@
 }
 
 qpid::sys::ConnectionInputHandler*
-ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out)
+ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out,
+                          const qpid::sys::Socket& s)
 {
-    return new Connection(out, broker);
+    return new Connection(out, broker, s);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h Wed Jan  2 07:56:20 2008
@@ -32,8 +32,8 @@
   public:
     ConnectionFactory(Broker& b);
             
-    virtual qpid::sys::ConnectionInputHandler* create(
-        qpid::sys::ConnectionOutputHandler* ctxt);
+    virtual qpid::sys::ConnectionInputHandler* create
+        (qpid::sys::ConnectionOutputHandler* ctxt, const sys::Socket& s);
             
     virtual ~ConnectionFactory();
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jan  2 07:56:20 2008
@@ -616,10 +616,6 @@
         purge ();
         status = Manageable::STATUS_OK;
         break;
-
-    case management::Queue::METHOD_INCREASEJOURNALSIZE :
-        status = Manageable::STATUS_NOT_IMPLEMENTED;
-        break;
     }
 
     return status;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Wed Jan  2 07:56:20 2008
@@ -58,6 +58,8 @@
     active.erase(session->getId());
     session->suspend();
     session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
+    if (session->mgmtObject.get() != 0)
+        session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry));
     suspended.push_back(session.release()); // In expiry order
     eraseExpired();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Wed Jan  2 07:56:20 2008
@@ -31,6 +31,10 @@
 
 using namespace framing;
 using sys::Mutex;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
 
 void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
 
@@ -50,11 +54,34 @@
     // TODO aconway 2007-09-20: SessionManager may add plugin
     // handlers to the chain.
     getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+
+    Manageable* parent = broker.GetVhostObject ();
+
+    if (parent != 0)
+    {
+        ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+        if (agent.get () != 0)
+        {
+            mgmtObject = management::Session::shared_ptr
+                (new management::Session (this, parent, id.str ()));
+            mgmtObject->set_attached (1);
+            mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+            mgmtObject->set_channelId (h.getChannel());
+            mgmtObject->set_detachedLifespan (getTimeout());
+            agent->addObject (mgmtObject);
+        }
+    }
 }
 
 SessionState::~SessionState() {
     // Remove ID from active session list.
     factory.erase(getId());
+
+    if (mgmtObject.get () != 0)
+    {
+        mgmtObject->resourceDestroy ();
+    }
 }
 
 SessionHandler* SessionState::getHandler() {
@@ -75,12 +102,22 @@
     getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
     Mutex::ScopedLock l(lock);
     handler = 0;
+    if (mgmtObject.get() != 0)
+    {
+        mgmtObject->set_attached  (0);
+    }
 }
 
 void SessionState::attach(SessionHandler& h) {
     {
         Mutex::ScopedLock l(lock);
         handler = &h;
+        if (mgmtObject.get() != 0)
+        {
+            mgmtObject->set_attached (1);
+            mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+            mgmtObject->set_channelId (h.getChannel());
+        }
     }
     h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
 }
@@ -95,5 +132,43 @@
     //This class could be used as the callback for queue notifications
     //if not attached, it can simply ignore the callback, else pass it
     //on to the connection
+
+ManagementObject::shared_ptr SessionState::GetManagementObject (void) const
+{
+    return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
+                                                     Args&    /*args*/)
+{
+    Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+    switch (methodId)
+    {
+    case management::Session::METHOD_DETACH :
+        if (handler != 0)
+        {
+            handler->localSuspend ();
+        }
+        status = Manageable::STATUS_OK;
+        break;
+
+    case management::Session::METHOD_CLOSE :
+        if (handler != 0)
+        {
+            handler->getConnection().closeChannel(handler->getChannel());
+        }
+        status = Manageable::STATUS_OK;
+        break;
+
+    case management::Session::METHOD_SOLICITACK :
+    case management::Session::METHOD_RESETLIFESPAN :
+        status = Manageable::STATUS_NOT_IMPLEMENTED;
+        break;
+    }
+
+    return status;
+}
+
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed Jan  2 07:56:20 2008
@@ -29,6 +29,8 @@
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/OutputControl.h"
 #include "qpid/sys/Time.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Session.h"
 
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -57,7 +59,8 @@
  */
 class SessionState : public framing::SessionState,
     public framing::FrameHandler::InOutHandler,
-    public sys::OutputControl
+    public sys::OutputControl,
+    public management::Manageable
 {
   public:
     ~SessionState();
@@ -82,6 +85,11 @@
     /** OutputControl **/
     void activateOutput();
 
+    // Manageable entry points
+    management::ManagementObject::shared_ptr GetManagementObject (void) const;
+    management::Manageable::status_t
+        ManagementMethod (uint32_t methodId, management::Args& args);
+
   protected:
     void handleIn(framing::AMQFrame&);
     void handleOut(framing::AMQFrame&);
@@ -102,6 +110,7 @@
     framing::ProtocolVersion version;
     sys::Mutex lock;
     boost::scoped_ptr<SemanticHandler> semanticHandler;
+    management::Session::shared_ptr mgmtObject;
 
   friend class SessionManager;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Wed Jan  2 07:56:20 2008
@@ -41,6 +41,8 @@
     nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
 }
 
+ManagementAgent::~ManagementAgent () {}
+
 void ManagementAgent::enableManagement (void)
 {
     enabled = 1;
@@ -54,6 +56,16 @@
     return agent;
 }
 
+void ManagementAgent::shutdown (void)
+{
+    if (agent.get () != 0)
+    {
+        agent->mExchange.reset ();
+        agent->dExchange.reset ();
+        agent.reset ();
+    }
+}
+
 void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
                                    broker::Exchange::shared_ptr _dexchange)
 {
@@ -73,6 +85,8 @@
 ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
     : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
 
+ManagementAgent::Periodic::~Periodic () {}
+
 void ManagementAgent::Periodic::fire ()
 {
     agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
@@ -93,12 +107,27 @@
     }
 }
 
-void ManagementAgent::EncodeHeader (Buffer& buf)
+void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls)
 {
     buf.putOctet ('A');
     buf.putOctet ('M');
     buf.putOctet ('0');
     buf.putOctet ('1');
+    buf.putOctet (opcode);
+    buf.putOctet (cls);
+}
+
+bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls)
+{
+    uint8_t h1 = buf.getOctet ();
+    uint8_t h2 = buf.getOctet ();
+    uint8_t h3 = buf.getOctet ();
+    uint8_t h4 = buf.getOctet ();
+
+    *opcode = buf.getOctet ();
+    *cls    = buf.getOctet ();
+
+    return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1';
 }
 
 void ManagementAgent::SendBuffer (Buffer&  buf,
@@ -112,9 +141,6 @@
     AMQFrame header (in_place<AMQHeaderBody>());
     AMQFrame content(in_place<AMQContentBody>());
 
-    QPID_LOG (debug, "ManagementAgent::SendBuffer - key="
-              << routingKey << " len=" << length);
-
     content.castBody<AMQContentBody>()->decode(buf, length);
 
     method.setEof  (false);
@@ -156,9 +182,7 @@
         if (object->getSchemaNeeded ())
         {
             Buffer msgBuffer (msgChars, BUFSIZE);
-            EncodeHeader (msgBuffer);
-            msgBuffer.putOctet ('S');  // opcode = Schema Record
-            msgBuffer.putOctet (0);    // content-class = N/A
+            EncodeHeader (msgBuffer, 'S');
             object->writeSchema (msgBuffer);
 
             contentSize = BUFSIZE - msgBuffer.available ();
@@ -170,9 +194,7 @@
         if (object->getConfigChanged () || object->isDeleted ())
         {
             Buffer msgBuffer (msgChars, BUFSIZE);
-            EncodeHeader (msgBuffer);
-            msgBuffer.putOctet ('C');  // opcode = Content Record
-            msgBuffer.putOctet ('C');  // content-class = Configuration
+            EncodeHeader (msgBuffer, 'C', 'C');
             object->writeConfig (msgBuffer);
 
             contentSize = BUFSIZE - msgBuffer.available ();
@@ -184,9 +206,7 @@
         if (object->getInstChanged ())
         {
             Buffer msgBuffer (msgChars, BUFSIZE);
-            EncodeHeader (msgBuffer);
-            msgBuffer.putOctet ('C');  // opcode = Content Record
-            msgBuffer.putOctet ('I');  // content-class = Instrumentation
+            EncodeHeader (msgBuffer, 'C', 'I');
             object->writeInstrumentation (msgBuffer);
 
             contentSize = BUFSIZE - msgBuffer.available ();
@@ -251,9 +271,6 @@
     start = pos + 1;
     string methodName = routingKey.substr (start, routingKey.length () - start);
 
-    QPID_LOG (debug, "Dispatch package: " << packageName << ", class: "
-              << className << ", method: " << methodName);
-
     contentSize = msg.encodedContentSize ();
     if (contentSize < 8 || contentSize > 65536)
         return;
@@ -263,19 +280,41 @@
     Buffer   inBuffer  (inMem,  contentSize);
     Buffer   outBuffer (outMem, 4096);
     uint32_t outLen;
+    uint8_t  opcode, unused;
 
     msg.encodeContent (inBuffer);
     inBuffer.reset ();
 
-    uint32_t methodId = inBuffer.getLong     ();
-    uint64_t objId    = inBuffer.getLongLong ();
-    string   replyTo;
+    if (!CheckHeader (inBuffer, &opcode, &unused))
+    {
+        QPID_LOG (debug, "    Invalid content header");
+        return;
+    }
 
-    inBuffer.getShortString (replyTo);
+    if (opcode != 'M')
+    {
+        QPID_LOG (debug, "    Unexpected opcode " << opcode);
+        return;
+    }
 
-    QPID_LOG (debug, "    len = " << contentSize << ", methodId = " <<
-              methodId << ", objId = " << objId);
+    uint32_t   methodId = inBuffer.getLong     ();
+    uint64_t   objId    = inBuffer.getLongLong ();
+    string     replyToKey;
+
+    const framing::MessageProperties* p =
+        msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+    if (p && p->hasReplyTo())
+    {
+        const framing::ReplyTo& rt = p->getReplyTo ();
+        replyToKey = rt.getRoutingKey ();
+    }
+    else
+    {
+        QPID_LOG (debug, "    Reply-to missing");
+        return;
+    }
 
+    EncodeHeader (outBuffer, 'R');
     outBuffer.putLong (methodId);
 
     ManagementObjectMap::iterator iter = managementObjects.find (objId);
@@ -291,7 +330,7 @@
 
     outLen = 4096 - outBuffer.available ();
     outBuffer.reset ();
-    SendBuffer (outBuffer, outLen, dExchange, replyTo);
+    SendBuffer (outBuffer, outLen, dExchange, replyToKey);
     free (inMem);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Wed Jan  2 07:56:20 2008
@@ -40,10 +40,13 @@
 
   public:
 
+    virtual ~ManagementAgent ();
+
     typedef boost::shared_ptr<ManagementAgent> shared_ptr;
 
     static void       enableManagement (void);
     static shared_ptr getAgent (void);
+    static void       shutdown (void);
 
     void setInterval     (uint16_t _interval) { interval = _interval; }
     void setExchange     (broker::Exchange::shared_ptr mgmtExchange,
@@ -61,7 +64,7 @@
         ManagementAgent& agent;
 
         Periodic (ManagementAgent& agent, uint32_t seconds);
-        ~Periodic () {}
+        virtual ~Periodic ();
         void fire ();
     };
 
@@ -77,7 +80,8 @@
     uint64_t                     nextObjectId;
 
     void PeriodicProcessing (void);
-    void EncodeHeader       (qpid::framing::Buffer&       buf);
+    void EncodeHeader       (qpid::framing::Buffer& buf, uint8_t  opcode, uint8_t  cls = 0);
+    bool CheckHeader        (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls);
     void SendBuffer         (qpid::framing::Buffer&       buf,
                              uint32_t                     length,
                              broker::Exchange::shared_ptr exchange,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h Wed Jan  2 07:56:20 2008
@@ -40,7 +40,7 @@
                         const qpid::framing::FieldTable& _args,
                         Manageable* _parent = 0);
 
-    virtual std::string getType() const { return typeName; }            
+    virtual std::string getType() const { return typeName; }
 
     virtual bool bind (Queue::shared_ptr queue,
                        const string&     routingKey,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Wed Jan  2 07:56:20 2008
@@ -46,12 +46,16 @@
     Manageable* coreObject;
     std::string className;
     
-    static const uint8_t TYPE_U8   = 1;
-    static const uint8_t TYPE_U16  = 2;
-    static const uint8_t TYPE_U32  = 3;
-    static const uint8_t TYPE_U64  = 4;
-    static const uint8_t TYPE_SSTR = 6;
-    static const uint8_t TYPE_LSTR = 7;
+    static const uint8_t TYPE_U8        = 1;
+    static const uint8_t TYPE_U16       = 2;
+    static const uint8_t TYPE_U32       = 3;
+    static const uint8_t TYPE_U64       = 4;
+    static const uint8_t TYPE_SSTR      = 6;
+    static const uint8_t TYPE_LSTR      = 7;
+    static const uint8_t TYPE_ABSTIME   = 8;
+    static const uint8_t TYPE_DELTATIME = 9;
+    static const uint8_t TYPE_REF       = 10;
+    static const uint8_t TYPE_BOOL      = 11;
 
     static const uint8_t ACCESS_RC = 1;
     static const uint8_t ACCESS_RW = 2;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Wed Jan  2 07:56:20 2008
@@ -131,7 +131,7 @@
 void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
 
 	AsynchIOHandler* async = new AsynchIOHandler; 
-	ConnectionInputHandler* handler = f->create(async);
+	ConnectionInputHandler* handler = f->create(async, s);
     AsynchIO* aio = new AsynchIO(s,
     	boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
     	boost::bind(&AsynchIOHandler::eof, async, _1),

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h Wed Jan  2 07:56:20 2008
@@ -22,6 +22,7 @@
 #define _ConnectionInputHandlerFactory_
 
 #include <boost/noncopyable.hpp>
+#include "qpid/sys/Socket.h"
 
 namespace qpid {
 namespace sys {
@@ -36,7 +37,7 @@
 class ConnectionInputHandlerFactory : private boost::noncopyable
 {
   public:
-    virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt) = 0;
+    virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt, const Socket& s) = 0;
     virtual ~ConnectionInputHandlerFactory(){}
 };
 

Modified: incubator/qpid/trunk/qpid/python/mgmt-cli/main.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/mgmt-cli/main.py?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/mgmt-cli/main.py (original)
+++ incubator/qpid/trunk/qpid/python/mgmt-cli/main.py Wed Jan  2 07:56:20 2008
@@ -49,6 +49,7 @@
     print "    list <className>                - Print list of objects of the specified class"
     print "    list <className> all            - Print contents of all objects of specified class"
     print "    list <className> active         - Print contents of all non-deleted objects of specified class"
+    print "    list <list-of-IDs>              - Print contents of one or more objects (infer className)"
     print "    list <className> <list-of-IDs>  - Print contents of one or more objects"
     print "        list is space-separated, ranges may be specified (i.e. 1004-1010)"
     print "    call <ID> <methodName> [<args>] - Invoke a method on an object"

Modified: incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py Wed Jan  2 07:56:20 2008
@@ -118,6 +118,7 @@
     self.schema         = {}
     self.baseId         = 0
     self.disp           = disp
+    self.lastUnit       = None
     self.methodSeq      = 1
     self.methodsPending = {}
     self.broker.start ()
@@ -125,6 +126,44 @@
   def close (self):
     self.broker.stop ()
 
+  def refName (self, oid):
+    if oid == 0:
+      return "NULL"
+    return str (oid - self.baseId)
+
+  def valueDisplay (self, className, key, value):
+    for kind in range (2):
+      schema = self.schema[className][kind]
+      for item in schema:
+        if item[0] == key:
+          typecode = item[1]
+          unit     = item[2]
+          if typecode >= 1 and typecode <= 5:  # numerics
+            if unit == None or unit == self.lastUnit:
+              return str (value)
+            else:
+              self.lastUnit = unit
+              suffix = ""
+              if value != 1:
+                suffix = "s"
+              return str (value) + " " + unit + suffix
+          elif typecode == 6 or typecode == 7: # strings
+            return value
+          elif typecode == 8:
+            if value == 0:
+              return "--"
+            return self.disp.timestamp (value)
+          elif typecode == 9:
+            return str (value)
+          elif typecode == 10:
+            return self.refName (value)
+          elif typecode == 11:
+            if value == 0:
+              return "False"
+            else:
+              return "True"
+    return "*type-error*"
+
   def getObjIndex (self, className, config):
     """ Concatenate the values from index columns to form a unique object name """
     result = ""
@@ -135,9 +174,7 @@
           result = result + "."
         for key,val in config:
           if key == item[0]:
-            if key.find ("Ref") != -1:
-              val = val - self.baseId
-            result = result + str (val)
+            result = result + self.valueDisplay (className, key, val)
     return result
 
   def classCompletions (self, prefix):
@@ -168,6 +205,14 @@
       return "short-string"
     elif typecode == 7:
       return "long-string"
+    elif typecode == 8:
+      return "abs-time"
+    elif typecode == 9:
+      return "delta-time"
+    elif typecode == 10:
+      return "reference"
+    elif typecode == 11:
+      return "boolean"
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
 
@@ -180,7 +225,7 @@
     elif code == 3:
       return "ReadOnly"
     else:
-      raise ValueErrir ("Invalid access code: %d" %code)
+      raise ValueError ("Invalid access code: %d" %code)
 
   def notNone (self, text):
     if text == None:
@@ -188,6 +233,12 @@
     else:
       return text
 
+  def isOid (self, id):
+    for char in str (id):
+      if not char.isdigit () and not char == '-':
+        return False
+    return True
+
   def listOfIds (self, className, tokens):
     """ Generate a tuple of object ids for a classname based on command tokens. """
     list = []
@@ -202,13 +253,14 @@
 
     else:
       for token in tokens:
-        if token.find ("-") != -1:
-          ids = token.split("-", 2)
-          for id in range (int (ids[0]), int (ids[1]) + 1):
-            if self.getClassForId (long (id) + self.baseId) == className:
-              list.append (id)
-        else:
-          list.append (token)
+        if self.isOid (token):
+          if token.find ("-") != -1:
+            ids = token.split("-", 2)
+            for id in range (int (ids[0]), int (ids[1]) + 1):
+              if self.getClassForId (long (id) + self.baseId) == className:
+                list.append (id)
+          else:
+            list.append (token)
 
     list.sort ()
     result = ()
@@ -258,7 +310,7 @@
           if ts[2] > 0:
             destroyTime = self.disp.timestamp (ts[2])
           objIndex = self.getObjIndex (className, config)
-          row = (objId - self.baseId, createTime, destroyTime, objIndex)
+          row = (self.refName (objId), createTime, destroyTime, objIndex)
           rows.append (row)
         self.disp.table ("Objects of type %s" % className,
                          ("ID", "Created", "Destroyed", "Index"),
@@ -270,12 +322,26 @@
     """ Generate a display of object data for a particular class """
     self.lock.acquire ()
     try:
-      className = tokens[0]
-      if className not in self.tables:
-        print "Class not known: %s" % className
-        raise ValueError ()
-        
-      userIds = self.listOfIds (className, tokens[1:])
+      self.lastUnit = None
+      if self.isOid (tokens[0]):
+        if tokens[0].find ("-") != -1:
+          rootId = int (tokens[0][0:tokens[0].find ("-")])
+        else:
+          rootId = int (tokens[0])
+
+        className = self.getClassForId (rootId + self.baseId)
+        remaining = tokens
+        if className == None:
+          print "Id not known: %d" % int (tokens[0])
+          raise ValueError ()
+      else:
+        className = tokens[0]
+        remaining = tokens[1:]
+        if className not in self.tables:
+          print "Class not known: %s" % className
+          raise ValueError ()
+
+      userIds = self.listOfIds (className, remaining)
       if len (userIds) == 0:
         print "No object IDs supplied"
         raise ValueError ()
@@ -286,36 +352,37 @@
           ids.append (long (id) + self.baseId)
 
       rows = []
+      timestamp = None
       config = self.tables[className][ids[0]][1]
       for eIdx in range (len (config)):
         key = config[eIdx][0]
         if key != "id":
-          isRef = key.find ("Ref") == len (key) - 3
           row   = ("config", key)
           for id in ids:
-            value = self.tables[className][id][1][eIdx][1]
-            if isRef:
-              value = value - self.baseId
-            row = row + (value,)
+            if timestamp == None or \
+               timestamp < self.tables[className][id][0][0]:
+              timestamp = self.tables[className][id][0][0]
+            (key, value) = self.tables[className][id][1][eIdx]
+            row = row + (self.valueDisplay (className, key, value),)
           rows.append (row)
 
       inst = self.tables[className][ids[0]][2]
       for eIdx in range (len (inst)):
         key = inst[eIdx][0]
         if key != "id":
-          isRef = key.find ("Ref") == len (key) - 3
           row = ("inst", key)
           for id in ids:
-            value = self.tables[className][id][2][eIdx][1]
-            if isRef:
-              value = value - self.baseId
-            row = row + (value,)
+            (key, value) = self.tables[className][id][2][eIdx]
+            row = row + (self.valueDisplay (className, key, value),)
           rows.append (row)
 
       titleRow = ("Type", "Element")
       for id in ids:
-        titleRow = titleRow + (str (id - self.baseId),)
-      self.disp.table ("Object of type %s:" % className, titleRow, rows)
+        titleRow = titleRow + (self.refName (id),)
+      caption = "Object of type %s:" % className
+      if timestamp != None:
+        caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")"
+      self.disp.table (caption, titleRow, rows)
 
     except:
       pass
@@ -418,6 +485,10 @@
       if className == None:
         raise ValueError ()
 
+      if methodName not in self.schema[className][2]:
+        print "Method '%s' not valid for class '%s'" % (methodName, className)
+        raise ValueError ()
+
       schemaMethod = self.schema[className][2][methodName]
       if len (args) != len (schemaMethod[1]):
         print "Wrong number of method args: Need %d, Got %d" % (len (schemaMethod[1]), len (args))
@@ -431,17 +502,19 @@
       self.methodsPending[self.methodSeq] = methodName
     except:
       methodOk = False
-      print "Error in call syntax"
     self.lock.release ()
     if methodOk:
-      self.broker.method (self.methodSeq, userOid + self.baseId, className,
-                          methodName, namedArgs)
+#      try:
+        self.broker.method (self.methodSeq, userOid + self.baseId, className,
+                            methodName, namedArgs)
+#      except ValueError, e:
+#        print "Error invoking method:", e
 
   def do_list (self, data):
     tokens = data.split ()
     if len (tokens) == 0:
       self.listClasses ()
-    elif len (tokens) == 1:
+    elif len (tokens) == 1 and not self.isOid (tokens[0]):
       self.listObjects (data)
     else:
       self.showObjects (tokens)

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Wed Jan  2 07:56:20 2008
@@ -77,6 +77,14 @@
       codec.encode_shortstr (value)
     elif typecode == 7:
       codec.encode_longstr  (value)
+    elif typecode == 8:  # ABSTIME
+      codec.encode_longlong (long (value))
+    elif typecode == 9:  # DELTATIME
+      codec.encode_longlong (long (value))
+    elif typecode == 10: # REF
+      codec.encode_longlong (long (value))
+    elif typecode == 11: # BOOL
+      codec.encode_octet    (int  (value))
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
 
@@ -95,6 +103,14 @@
       data = codec.decode_shortstr ()
     elif typecode == 7:
       data = codec.decode_longstr ()
+    elif typecode == 8:  # ABSTIME
+      data = codec.decode_longlong ()
+    elif typecode == 9:  # DELTATIME
+      data = codec.decode_longlong ()
+    elif typecode == 10: # REF
+      data = codec.decode_longlong ()
+    elif typecode == 11: # BOOL
+      data = codec.decode_octet ()
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
     return data
@@ -236,10 +252,7 @@
     elif cls == 'I':
       self.broker.inst_cb[1]   (self.broker.inst_cb[0], className, row, timestamps)
 
-  def parse (self, codec):
-    opcode = chr (codec.decode_octet ())
-    cls    = chr (codec.decode_octet ())
-
+  def parse (self, codec, opcode, cls):
     if opcode == 'S':
       self.parseSchema (cls, codec)
 
@@ -261,34 +274,53 @@
   mExchange = "qpid.management"
   dExchange = "amq.direct"
 
+  def setHeader (self, codec, opcode, cls = 0):
+    codec.encode_octet (ord ('A'))
+    codec.encode_octet (ord ('M'))
+    codec.encode_octet (ord ('0'))
+    codec.encode_octet (ord ('1'))
+    codec.encode_octet (opcode)
+    codec.encode_octet (cls)
+
   def checkHeader (self, codec):
     octet = chr (codec.decode_octet ())
     if octet != 'A':
-      return 0
+      return None
     octet = chr (codec.decode_octet ())
     if octet != 'M':
-      return 0
+      return None
     octet = chr (codec.decode_octet ())
     if octet != '0':
-      return 0
+      return None
     octet = chr (codec.decode_octet ())
     if octet != '1':
-      return 0
-    return 1
+      return None
+    opcode = chr (codec.decode_octet ())
+    cls    = chr (codec.decode_octet ())
+    return (opcode, cls)
 
   def publish_cb (self, msg):
     codec = Codec (StringIO (msg.content.body), self.spec)
 
-    if self.checkHeader (codec) == 0:
+    hdr = self.checkHeader (codec)
+    if hdr == None:
       raise ValueError ("outer header invalid");
 
-    self.metadata.parse (codec)
+    self.metadata.parse (codec, hdr[0], hdr[1])
     msg.complete ()
 
   def reply_cb (self, msg):
     codec    = Codec (StringIO (msg.content.body), self.spec)
-    sequence = codec.decode_long ()
-    status   = codec.decode_long ()
+    hdr = self.checkHeader (codec)
+    if hdr == None:
+      msg.complete ()
+      return
+    if hdr[0] != 'R':
+      msg.complete ()
+      return
+
+    sequence = codec.decode_long  ()
+    status   = codec.decode_long  ()
     sText    = codec.decode_shortstr ()
 
     data = self.sequenceManager.release (sequence)
@@ -369,9 +401,10 @@
               methodName, args=None, packageName="qpid"):
     codec = Codec (StringIO (), self.spec);
     sequence = self.sequenceManager.reserve ((userSequence, className, methodName))
+    self.setHeader (codec, ord ('M'))
     codec.encode_long     (sequence)    # Method sequence id
     codec.encode_longlong (objId)       # ID of object
-    codec.encode_shortstr (self.rqname) # name of reply queue
+    #codec.encode_shortstr (self.rqname) # name of reply queue
 
     # Encode args according to schema
     if (className,'M') not in self.metadata.schema:
@@ -402,6 +435,8 @@
     msg["content_type"] = "application/octet-stream"
     msg["routing_key"]  = "method." + packageName + "." + className + "." + methodName
     msg["reply_to"]     = self.spec.struct ("reply_to")
+    msg["reply_to"]["exchange_name"] = "amq.direct"
+    msg["reply_to"]["routing_key"]   = self.rqname
     self.channel.message_transfer (destination="qpid.management", content=msg)
 
   def isConnected (self):
@@ -414,7 +449,7 @@
       self.client = Client (self.host, self.port, self.spec)
       self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
       self.channel = self.client.channel (1)
-      response = self.channel.session_open (detached_lifetime=10)
+      response = self.channel.session_open (detached_lifetime=300)
       self.qname  = "mgmt-"  + base64.urlsafe_b64encode (response.session_id)
       self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id)
 

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Wed Jan  2 07:56:20 2008
@@ -65,26 +65,23 @@
     <configElement name="stagingThreshold"     type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
     <configElement name="storeLib"             type="sstr"   access="RO" desc="Name of persistent storage library"/>
     <configElement name="asyncStore"           type="bool"   access="RO" desc="Use async persistent store"/>
-    <configElement name="mgmtPubInterval"      type="uint16" min="1" access="RW" unit="second" desc="Interval for management broadcasts"/>
+    <configElement name="mgmtPubInterval"      type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/>
     <configElement name="initialDiskPageSize"  type="uint32" access="RO" desc="Number of disk pages allocated for storage"/>
     <configElement name="initialPagesPerQueue" type="uint32" access="RO" desc="Number of disk pages allocated per queue"/>
     <configElement name="clusterName"          type="sstr"   access="RO"
                    desc="Name of cluster this server is a member of, zero-length for standalone server"/>
     <configElement name="version"              type="sstr"   access="RO" desc="Running software version"/>
 
-
-
     <method name="joinCluster">
       <arg name="clusterName" dir="I" type="sstr"/>
     </method>
 
     <method name="leaveCluster"/>
 
-    <method name="echo">
+    <method name="echo" desc="Request a response to test the path to the management agent">
       <arg name="sequence" dir="IO" type="uint32" default="0"/>
       <arg name="body"     dir="IO" type="lstr"   default=""/>
     </method>
-<!--<method name="crash" desc="Temporary test method to crash the broker"/>-->
   </class>
 
   <!--
@@ -135,12 +132,11 @@
     <instElement name="consumers"           type="hilo32"  unit="consumer"    desc="Current consumers on queue"/>
     <instElement name="bindings"            type="hilo32"  unit="binding"     desc="Current bindings"/>
     <instElement name="unackedMessages"     type="hilo32"  unit="message"     desc="Messages consumed but not yet acked"/>
+    <instElement name="messageLatencyMin"   type="uint64"  unit="nanosecond"  desc="Minimum broker latency through this queue"/>
+    <instElement name="messageLatencyMax"   type="uint64"  unit="nanosecond"  desc="Maximum broker latency through this queue"/>
+    <instElement name="messageLatencyAvg"   type="uint64"  unit="nanosecond"  desc="Average broker latency through this queue"/>
 
-    <method name="purge"               desc="Discard all messages on queue"/>
-    <method name="increaseJournalSize" desc="Increase number of disk pages allocated for this queue">
-      <arg name="pages" type="uint32" dir="I" desc="New total page allocation"/>
-    </method>
-
+    <method name="purge" desc="Discard all messages on queue"/>
   </class>
 
   <!--
@@ -184,17 +180,16 @@
   -->
   <class name="client">
     <configElement name="vhostRef" type="objId"  access="RC" index="y" parentRef="y"/>
-    <configElement name="ipAddr"   type="uint32" access="RC" index="y"/>
-    <configElement name="port"     type="uint16" access="RC" index="y"/>
+    <configElement name="address"  type="sstr"   access="RC" index="y"/>
 
-    <instElement name="authIdentity"  type="sstr"/>
-    <instElement name="msgsProduced"  type="count64"/>
-    <instElement name="msgsConsumed"  type="count64"/>
-    <instElement name="bytesProduced" type="count64"/>
-    <instElement name="bytesConsumed" type="count64"/>
+    <instElement name="closing"          type="bool" desc="This client is closing by management request"/>
+    <instElement name="authIdentity"     type="sstr"/>
+    <instElement name="framesFromClient" type="count64"/>
+    <instElement name="framesToClient"   type="count64"/>
+    <instElement name="bytesFromClient"  type="count64"/>
+    <instElement name="bytesToClient"    type="count64"/>
 
     <method name="close"/> 
-    <method name="detach"/>
   </class>
 
   <!--
@@ -205,11 +200,12 @@
   <class name="session">
     <configElement name="vhostRef"         type="objId"  access="RC" index="y" parentRef="y"/>
     <configElement name="name"             type="sstr"   access="RC" index="y"/>
+    <configElement name="channelId"        type="uint16" access="RO"/>
     <configElement name="clientRef"        type="objId"  access="RO"/>
-    <configElement name="detachedLifespan" type="uint32" access="RO"/>
+    <configElement name="detachedLifespan" type="uint32" access="RO" unit="second"/>
 
     <instElement name="attached"          type="bool"/>
-    <instElement name="remainingLifespan" type="count32"/>
+    <instElement name="expireTime"        type="absTime"/>
     <instElement name="framesOutstanding" type="count32"/>
 
     <method name="solicitAck"/>

Modified: incubator/qpid/trunk/qpid/specs/management-types.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-types.xml?rev=608135&r1=608134&r2=608135&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-types.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-types.xml Wed Jan  2 07:56:20 2008
@@ -19,14 +19,16 @@
   under the License.
 -->
 
-<type name="objId"   base="U64"  cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
-<type name="uint8"   base="U8"   cpp="uint8_t"     encode="@.putOctet (#)"       decode="# = @.getOctet ()"     accessor="direct" init="0"/>
-<type name="uint16"  base="U16"  cpp="uint16_t"    encode="@.putShort (#)"       decode="# = @.getShort ()"     accessor="direct" init="0"/>
-<type name="uint32"  base="U32"  cpp="uint32_t"    encode="@.putLong (#)"        decode="# = @.getLong ()"      accessor="direct" init="0"/>
-<type name="uint64"  base="U64"  cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
-<type name="bool"    base="U8"   cpp="bool"        encode="@.putOctet (#?1:0)"   decode="# = @.getOctet ()==1"  accessor="direct" init="0"/>
-<type name="sstr"    base="SSTR" cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)"  accessor="direct" init='""'/>
-<type name="lstr"    base="LSTR" cpp="std::string" encode="@.putLongString (#)"  decode="@.getLongString (#)"   accessor="direct" init='""'/>
+<type name="objId"     base="REF"       cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
+<type name="uint8"     base="U8"        cpp="uint8_t"     encode="@.putOctet (#)"       decode="# = @.getOctet ()"     accessor="direct" init="0"/>
+<type name="uint16"    base="U16"       cpp="uint16_t"    encode="@.putShort (#)"       decode="# = @.getShort ()"     accessor="direct" init="0"/>
+<type name="uint32"    base="U32"       cpp="uint32_t"    encode="@.putLong (#)"        decode="# = @.getLong ()"      accessor="direct" init="0"/>
+<type name="uint64"    base="U64"       cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
+<type name="bool"      base="BOOL"      cpp="uint8_t"     encode="@.putOctet (#?1:0)"   decode="# = @.getOctet ()==1"  accessor="direct" init="0"/>
+<type name="sstr"      base="SSTR"      cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)"  accessor="direct" init='""'/>
+<type name="lstr"      base="LSTR"      cpp="std::string" encode="@.putLongString (#)"  decode="@.getLongString (#)"   accessor="direct" init='""'/>
+<type name="absTime"   base="ABSTIME"   cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
+<type name="deltaTime" base="DELTATIME" cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
 
 <type name="hilo8"   base="U8"   cpp="uint8_t"  encode="@.putOctet (#)"    decode="# = @.getOctet ()"    style="wm" accessor="counter" init="0"/>
 <type name="hilo16"  base="U16"  cpp="uint16_t" encode="@.putShort (#)"    decode="# = @.getShort ()"    style="wm" accessor="counter" init="0"/>



Mime
View raw message