Return-Path: Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: (qmail 42103 invoked from network); 11 Jan 2011 11:02:32 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Jan 2011 11:02:32 -0000 Received: (qmail 84261 invoked by uid 500); 11 Jan 2011 11:02:32 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 84194 invoked by uid 500); 11 Jan 2011 11:02:30 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 84187 invoked by uid 99); 11 Jan 2011 11:02:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Jan 2011 11:02:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Jan 2011 11:02:27 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0208C23888FE; Tue, 11 Jan 2011 11:02:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1057578 - in /qpid/trunk/qpid: cpp/src/qpid/broker/Connection.cpp cpp/src/qpid/broker/Connection.h java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java specs/management-schema.xml tests/src/py/qpid_tests/broker_0_10/management.py Date: Tue, 11 Jan 2011 11:02:06 -0000 To: commits@qpid.apache.org From: gsim@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110111110207.0208C23888FE@eris.apache.org> Author: gsim Date: Tue Jan 11 11:02:05 2011 New Revision: 1057578 URL: http://svn.apache.org/viewvc?rev=1057578&view=rev Log: QPID-2991: added message counts to connection stats; fixed xxxToClient stats Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java qpid/trunk/qpid/specs/management-schema.xml qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1057578&r1=1057577&r2=1057578&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jan 11 11:02:05 2011 @@ -31,6 +31,7 @@ #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/enum.h" +#include "qpid/framing/MessageTransferBody.h" #include "qmf/org/apache/qpid/broker/EventClientConnect.h" #include "qmf/org/apache/qpid/broker/EventClientDisconnect.h" @@ -98,8 +99,10 @@ Connection::Connection(ConnectionOutputH timer(broker_.getTimer()), errorListener(0), objectId(objectId_), - shadow(shadow_) + shadow(shadow_), + outboundTracker(*this) { + outboundTracker.wrap(out); if (isLink) links.notifyConnection(mgmtId, this); // In a cluster, allow adding the management object to be delayed. @@ -160,27 +163,46 @@ void Connection::received(framing::AMQFr getChannel(frame.getChannel()).in(frame); } - if (isLink) + if (isLink) //i.e. we are acting as the client to another broker recordFromServer(frame); else recordFromClient(frame); } -void Connection::recordFromServer(framing::AMQFrame& frame) +void Connection::sent(const framing::AMQFrame& frame) +{ + if (isLink) //i.e. we are acting as the client to another broker + recordFromClient(frame); + else + recordFromServer(frame); +} + +bool isMessage(const AMQMethodBody* method) +{ + return method && method->isA(); +} + +void Connection::recordFromServer(const framing::AMQFrame& frame) { if (mgmtObject != 0) { mgmtObject->inc_framesToClient(); mgmtObject->inc_bytesToClient(frame.encodedSize()); + if (isMessage(frame.getMethod())) { + mgmtObject->inc_msgsToClient(); + } } } -void Connection::recordFromClient(framing::AMQFrame& frame) +void Connection::recordFromClient(const framing::AMQFrame& frame) { if (mgmtObject != 0) { mgmtObject->inc_framesFromClient(); mgmtObject->inc_bytesFromClient(frame.encodedSize()); + if (isMessage(frame.getMethod())) { + mgmtObject->inc_msgsFromClient(); + } } } @@ -442,4 +464,21 @@ void Connection::restartTimeout() bool Connection::isOpen() { return adapter.isOpen(); } +Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con), next(0) {} +void Connection::OutboundFrameTracker::close() { next->close(); } +size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); } +void Connection::OutboundFrameTracker::abort() { next->abort(); } +void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); } +void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } +void Connection::OutboundFrameTracker::send(framing::AMQFrame& f) +{ + next->send(f); + con.sent(f); +} +void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p) +{ + next = p.get(); + p.set(this); +} + }} Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=1057578&r1=1057577&r2=1057578&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jan 11 11:02:05 2011 @@ -111,8 +111,8 @@ class Connection : public sys::Connectio ManagementMethod (uint32_t methodId, management::Args& args, std::string&); void requestIOProcessing (boost::function0); - void recordFromServer (framing::AMQFrame& frame); - void recordFromClient (framing::AMQFrame& frame); + void recordFromServer (const framing::AMQFrame& frame); + void recordFromClient (const framing::AMQFrame& frame); std::string getAuthMechanism(); std::string getAuthCredentials(); std::string getUsername(); @@ -181,7 +181,29 @@ class Connection : public sys::Connectio ErrorListener* errorListener; uint64_t objectId; bool shadow; + /** + * Chained ConnectionOutputHandler that allows outgoing frames to be + * tracked (for updating mgmt stats). + */ + class OutboundFrameTracker : public sys::ConnectionOutputHandler + { + public: + OutboundFrameTracker(Connection&); + void close(); + size_t getBuffered() const; + void abort(); + void activateOutput(); + void giveReadCredit(int32_t credit); + void send(framing::AMQFrame&); + void wrap(sys::ConnectionOutputHandlerPtr&); + private: + Connection& con; + sys::ConnectionOutputHandler* next; + }; + OutboundFrameTracker outboundTracker; + + void sent(const framing::AMQFrame& f); public: qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; } }; Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1057578&r1=1057577&r2=1057578&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Tue Jan 11 11:02:05 2011 @@ -1249,6 +1249,18 @@ public class QMFService implements Confi return 0l; } + public Long getMsgsFromClient() + { + // TODO + return 0l; + } + + public Long getMsgsToClient() + { + // TODO + return 0l; + } + public BrokerSchema.ConnectionClass.CloseMethodResponseCommand close(final BrokerSchema.ConnectionClass.CloseMethodResponseCommandFactory factory) { _obj.mgmtClose(); Modified: qpid/trunk/qpid/specs/management-schema.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1057578&r1=1057577&r2=1057578&view=diff ============================================================================== --- qpid/trunk/qpid/specs/management-schema.xml (original) +++ qpid/trunk/qpid/specs/management-schema.xml Tue Jan 11 11:02:05 2011 @@ -251,6 +251,8 @@ + + Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1057578&r1=1057577&r2=1057578&view=diff ============================================================================== --- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py (original) +++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Tue Jan 11 11:02:05 2011 @@ -490,3 +490,33 @@ class ManagementTest (TestBase010): self.assertEqual(queue.bindingCount, 1, "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + def test_connection_stats(self): + """ + Test message in/out stats for connection + """ + self.startQmf() + conn = self.connect() + session = conn.session("stats-session") + + #using qmf find named session and the corresponding connection: + conn_qmf = self.qmf.getObjects(_class="session", name="stats-session")[0]._connectionRef_ + + #send a message to a queue + session.queue_declare(queue="stats-q", exclusive=True, auto_delete=True) + session.message_transfer(message=Message(session.delivery_properties(routing_key="stats-q"), "abc")) + + #check the 'msgs sent from' stat for this connection + conn_qmf.update() + self.assertEqual(conn_qmf.msgsFromClient, 1) + + #receive message from queue + session.message_subscribe(destination="d", queue="stats-q") + incoming = session.incoming("d") + incoming.start() + self.assertEqual("abc", incoming.get(timeout=1).body) + + #check the 'msgs sent to' stat for this connection + conn_qmf.update() + self.assertEqual(conn_qmf.msgsToClient, 1) + + --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org