qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1057578 - in /qpid/trunk/qpid: cpp/src/qpid/broker/Connection.cpp cpp/src/qpid/broker/Connection.h java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java specs/management-schema.xml tests/src/py/qpid_tests/broker_0_10/management.py
Date Tue, 11 Jan 2011 11:02:06 GMT
Author: gsim
Date: Tue Jan 11 11:02:05 2011
New Revision: 1057578

URL: http://svn.apache.org/viewvc?rev=1057578&view=rev
Log:
QPID-2991: added message counts to connection stats; fixed xxxToClient stats

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
    qpid/trunk/qpid/specs/management-schema.xml
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1057578&r1=1057577&r2=1057578&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jan 11 11:02:05 2011
@@ -31,6 +31,7 @@
 #include "qpid/ptr_map.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/framing/enum.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qmf/org/apache/qpid/broker/EventClientConnect.h"
 #include "qmf/org/apache/qpid/broker/EventClientDisconnect.h"
 
@@ -98,8 +99,10 @@ Connection::Connection(ConnectionOutputH
     timer(broker_.getTimer()),
     errorListener(0),
     objectId(objectId_),
-    shadow(shadow_)
+    shadow(shadow_),
+    outboundTracker(*this)
 {
+    outboundTracker.wrap(out);
     if (isLink)
         links.notifyConnection(mgmtId, this);
     // In a cluster, allow adding the management object to be delayed.
@@ -160,27 +163,46 @@ void Connection::received(framing::AMQFr
         getChannel(frame.getChannel()).in(frame);
     }
 
-    if (isLink)
+    if (isLink) //i.e. we are acting as the client to another broker
         recordFromServer(frame);
     else
         recordFromClient(frame);
 }
 
-void Connection::recordFromServer(framing::AMQFrame& frame)
+void Connection::sent(const framing::AMQFrame& frame)
+{
+    if (isLink) //i.e. we are acting as the client to another broker
+        recordFromClient(frame);
+    else
+        recordFromServer(frame);
+}
+
+bool isMessage(const AMQMethodBody* method)
+{
+    return method && method->isA<qpid::framing::MessageTransferBody>();
+}
+
+void Connection::recordFromServer(const framing::AMQFrame& frame)
 {
     if (mgmtObject != 0)
     {
         mgmtObject->inc_framesToClient();
         mgmtObject->inc_bytesToClient(frame.encodedSize());
+        if (isMessage(frame.getMethod())) {
+            mgmtObject->inc_msgsToClient();
+        }
     }
 }
 
-void Connection::recordFromClient(framing::AMQFrame& frame)
+void Connection::recordFromClient(const framing::AMQFrame& frame)
 {
     if (mgmtObject != 0)
     {
         mgmtObject->inc_framesFromClient();
         mgmtObject->inc_bytesFromClient(frame.encodedSize());
+        if (isMessage(frame.getMethod())) {
+            mgmtObject->inc_msgsFromClient();
+        }
     }
 }
 
@@ -442,4 +464,21 @@ void Connection::restartTimeout()
 
 bool Connection::isOpen() { return adapter.isOpen(); }
 
+Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con),
next(0) {}
+void Connection::OutboundFrameTracker::close() { next->close(); }
+size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered();
}
+void Connection::OutboundFrameTracker::abort() { next->abort(); }
+void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
+void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit);
}
+void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
+{ 
+    next->send(f); 
+    con.sent(f);
+}
+void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p)
+{
+    next = p.get();
+    p.set(this);
+}
+
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=1057578&r1=1057577&r2=1057578&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jan 11 11:02:05 2011
@@ -111,8 +111,8 @@ class Connection : public sys::Connectio
         ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
 
     void requestIOProcessing (boost::function0<void>);
-    void recordFromServer (framing::AMQFrame& frame);
-    void recordFromClient (framing::AMQFrame& frame);
+    void recordFromServer (const framing::AMQFrame& frame);
+    void recordFromClient (const framing::AMQFrame& frame);
     std::string getAuthMechanism();
     std::string getAuthCredentials();
     std::string getUsername();
@@ -181,7 +181,29 @@ class Connection : public sys::Connectio
     ErrorListener* errorListener;
     uint64_t objectId;
     bool shadow;
+    /**
+     * Chained ConnectionOutputHandler that allows outgoing frames to be
+     * tracked (for updating mgmt stats).
+     */
+    class OutboundFrameTracker : public sys::ConnectionOutputHandler
+    {
+      public:
+        OutboundFrameTracker(Connection&);
+        void close();
+        size_t getBuffered() const;
+        void abort();
+        void activateOutput();
+        void giveReadCredit(int32_t credit);
+        void send(framing::AMQFrame&);
+        void wrap(sys::ConnectionOutputHandlerPtr&);
+      private:
+        Connection& con;
+        sys::ConnectionOutputHandler* next;
+    };
+    OutboundFrameTracker outboundTracker;
+    
 
+    void sent(const framing::AMQFrame& f);
   public:
     qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
 };

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1057578&r1=1057577&r2=1057578&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Tue Jan
11 11:02:05 2011
@@ -1249,6 +1249,18 @@ public class QMFService implements Confi
             return 0l;
         }
 
+        public Long getMsgsFromClient()
+        {
+            // TODO
+            return 0l;
+        }
+
+        public Long getMsgsToClient()
+        {
+            // TODO
+            return 0l;
+        }
+
         public BrokerSchema.ConnectionClass.CloseMethodResponseCommand close(final BrokerSchema.ConnectionClass.CloseMethodResponseCommandFactory
factory)
         {
             _obj.mgmtClose();

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1057578&r1=1057577&r2=1057578&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Tue Jan 11 11:02:05 2011
@@ -251,6 +251,8 @@
     <statistic name="framesToClient"   type="count64"/>
     <statistic name="bytesFromClient"  type="count64"/>
     <statistic name="bytesToClient"    type="count64"/>
+    <statistic name="msgsFromClient"  type="count64"/>
+    <statistic name="msgsToClient"    type="count64"/>
 
     <method name="close"/> 
   </class>

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1057578&r1=1057577&r2=1057578&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Tue Jan 11 11:02:05
2011
@@ -490,3 +490,33 @@ class ManagementTest (TestBase010):
         self.assertEqual(queue.bindingCount, 1,
                          "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
 
+    def test_connection_stats(self):
+        """
+        Test message in/out stats for connection
+        """
+        self.startQmf()
+        conn = self.connect()
+        session = conn.session("stats-session")
+
+        #using qmf find named session and the corresponding connection:
+        conn_qmf = self.qmf.getObjects(_class="session", name="stats-session")[0]._connectionRef_
+        
+        #send a message to a queue
+        session.queue_declare(queue="stats-q", exclusive=True, auto_delete=True)
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="stats-q"),
"abc"))
+        
+        #check the 'msgs sent from' stat for this connection
+        conn_qmf.update()
+        self.assertEqual(conn_qmf.msgsFromClient, 1)
+
+        #receive message from queue
+        session.message_subscribe(destination="d", queue="stats-q")
+        incoming = session.incoming("d")
+        incoming.start()
+        self.assertEqual("abc", incoming.get(timeout=1).body)
+
+        #check the 'msgs sent to' stat for this connection
+        conn_qmf.update()
+        self.assertEqual(conn_qmf.msgsToClient, 1)
+
+        



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message