qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject svn commit: r740433 - in /qpid/trunk/qpid: cpp/src/qpid/broker/SessionState.cpp cpp/src/qpid/broker/SessionState.h cpp/src/qpid/client/SessionImpl.cpp cpp/src/qpid/client/SessionImpl.h specs/management-schema.xml
Date Tue, 03 Feb 2009 20:41:04 GMT
Author: astitcher
Date: Tue Feb  3 20:41:04 2009
New Revision: 740433

URL: http://svn.apache.org/viewvc?rev=740433&view=rev
Log:
Add in management statistics for client flow control.

Really fixed Client library to count credit the same way the broker does.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
    qpid/trunk/qpid/specs/management-schema.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=740433&r1=740432&r2=740433&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Feb  3 20:41:04 2009
@@ -63,6 +63,14 @@
       mgmtObject(0),
       rateFlowcontrol(0)
 {
+    uint32_t maxRate = broker.getOptions().maxSessionRate;
+    if (maxRate) {
+        if (handler->getConnection().getClientThrottling()) {
+            rateFlowcontrol = new RateFlowcontrol(maxRate);
+        } else {
+            QPID_LOG(warning, getId() << ": Unable to flow control client - client
doesn't support");
+        }
+    }
     Manageable* parent = broker.GetVhostObject ();
     if (parent != 0) {
         ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
@@ -72,25 +80,18 @@
             mgmtObject->set_attached (0);
             mgmtObject->set_detachedLifespan (0);
             mgmtObject->clr_expireTime();
+            if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate);
             ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
             agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
         }
     }
-    uint32_t maxRate = broker.getOptions().maxSessionRate;
-    if (maxRate) { 
-        if (handler->getConnection().getClientThrottling()) {
-            rateFlowcontrol = new RateFlowcontrol(maxRate);
-        } else {
-            QPID_LOG(warning, getId() << ": Unable to flow control client - client
doesn't support");
-        }
-    }
     attach(h);
 }
 
 SessionState::~SessionState() {
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
-        
+
     if (flowControlTimer)
         flowControlTimer->cancel();
 }
@@ -213,14 +214,7 @@
     void fire() {
         // This is the best we can currently do to avoid a destruction/fire race
         if (!isCancelled()) {
-            // Send credit
-            AbsTime now = AbsTime::now();
-            uint32_t sendCredit = flowControl.receivedMessage(now, 0);
-            if ( sendCredit>0 ) {
-                QPID_LOG(debug, sessionState.getId() << ": send producer credit " <<
sendCredit);
-                sessionState.getProxy().getMessage().flow("", 0, sendCredit);
-                flowControl.sentCredit(now, sendCredit);
-            } else if ( flowControl.flowStopped() ) {
+            if ( !sessionState.processSendCredit(0) && flowControl.flowStopped()
) {
                 QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
                 reset();
                 timer.add(this);
@@ -270,13 +264,7 @@
             // TODO: Probably do message.stop("") first time then disconnect
             getProxy().getMessage().stop("");
         } else {
-            AbsTime now = AbsTime::now();
-            uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1);
-            if ( sendCredit>0 ) {
-                QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
-                getProxy().getMessage().flow("", 0, sendCredit);
-                rateFlowcontrol->sentCredit(now, sendCredit);
-            } else if ( rateFlowcontrol->flowStopped() ) {
+            if ( !processSendCredit(1) && rateFlowcontrol->flowStopped() ) {
                 QPID_LOG(debug, getId() << ": Schedule sending credit");
                 Timer& timer = getBroker().getTimer();
                 // Use heuristic for scheduled credit of time for 50 messages, but not longer
than 500ms
@@ -288,6 +276,22 @@
     }
 }
 
+bool SessionState::processSendCredit(uint32_t msgs)
+{
+    AbsTime now = AbsTime::now();
+    uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs);
+    if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
+    if ( sendCredit>0 ) {
+        QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
+        getProxy().getMessage().flow("", 0, sendCredit);
+        rateFlowcontrol->sentCredit(now, sendCredit);
+        if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
+        return true;
+    } else {
+        return false;
+    }
+}
+
 void SessionState::sendAcceptAndCompletion()
 {
     if (!accepted.empty()) {
@@ -357,10 +361,12 @@
 
     if (rateFlowcontrol) {
         // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs
worth
-        QPID_LOG(debug, getId() << ": Issuing producer message credit " << std::min(rateFlowcontrol->getRate(),
100U));
+        uint32_t credit = std::min(rateFlowcontrol->getRate(), 100U);
+        QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
         getProxy().getMessage().setFlowMode("", 0);
-        getProxy().getMessage().flow("", 0, std::min(rateFlowcontrol->getRate(), 100U));
-        rateFlowcontrol->sentCredit(AbsTime::now(), std::min(rateFlowcontrol->getRate(),
100U));
+        getProxy().getMessage().flow("", 0, credit);
+        rateFlowcontrol->sentCredit(AbsTime::now(), credit);
+        if (mgmtObject) mgmtObject->inc_clientCredit(credit);
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=740433&r1=740432&r2=740433&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Feb  3 20:41:04 2009
@@ -62,7 +62,7 @@
  * Broker-side session state includes session's handler chains, which
  * may themselves have state.
  */
-class SessionState : public qpid::SessionState, 
+class SessionState : public qpid::SessionState,
                      public SessionContext,
                      public DeliveryAdapter,
                      public management::Manageable,
@@ -79,7 +79,7 @@
 
     /** @pre isAttached() */
     framing::AMQP_ClientProxy& getProxy();
-    
+
     /** @pre isAttached() */
     ConnectionState& getConnection();
     bool isLocal(const ConnectionToken* t) const;
@@ -91,7 +91,7 @@
     void giveReadCredit(int32_t);
 
     void senderCompleted(const framing::SequenceSet& ranges);
-    
+
     void sendCompletion();
 
     //delivery adapter methods:
@@ -108,6 +108,8 @@
     SemanticState& getSemanticState() { return semanticState; }
     boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage();
}
 
+    bool processSendCredit(uint32_t msgs);
+
   private:
 
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber&
id);
@@ -124,7 +126,7 @@
     void sendAcceptAndCompletion();
 
     Broker& broker;
-    SessionHandler* handler;    
+    SessionHandler* handler;
     sys::AbsTime expiry;        // Used by SessionManager.
     SemanticState semanticState;
     SessionAdapter adapter;
@@ -133,7 +135,7 @@
     IncompleteMessageList::CompletionListener enqueuedOp;
     qmf::org::apache::qpid::broker::Session* mgmtObject;
     qpid::framing::SequenceSet accepted;
-    
+
     // State used for producer flow control (rate limited)
     RateFlowcontrol* rateFlowcontrol;
     boost::intrusive_ptr<TimerTask> flowControlTimer;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=740433&r1=740432&r2=740433&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Tue Feb  3 20:41:04 2009
@@ -329,6 +329,10 @@
 
 Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content)
 {
+    // Only message transfers have content
+    if (content && sendMsgCredit) {
+        sendMsgCredit->acquire();
+    }
     Acquire a(sendLock);
     SequenceNumber id = nextOut++;
     {
@@ -366,7 +370,7 @@
     uint64_t data_length = content.getData().length();
     if(data_length > 0){
         header.setLastSegment(false);
-        handleContentOut(header);   
+        handleOut(header);   
         /*Note: end of frame marker included in overhead but not in size*/
         const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); 
 
@@ -395,7 +399,7 @@
             }
         }
     } else {
-        handleContentOut(header);   
+        handleOut(header);   
     }
 }
 
@@ -448,14 +452,6 @@
     sendFrame(frame, true);
 }
 
-void SessionImpl::handleContentOut(AMQFrame& frame) // user thread
-{
-    if (sendMsgCredit) {
-        sendMsgCredit->acquire();
-    }
-    sendFrame(frame, true);
-}
-
 void SessionImpl::proxyOut(AMQFrame& frame) // network thread
 {
     //Note: this case is treated slightly differently that command

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=740433&r1=740432&r2=740433&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Tue Feb  3 20:41:04 2009
@@ -146,7 +146,6 @@
 
     void handleIn(framing::AMQFrame& frame);
     void handleOut(framing::AMQFrame& frame);
-    void handleContentOut(framing::AMQFrame& frame);
     /**
      * Sends session controls. This case is treated slightly
      * differently than command frames sent by the application via

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=740433&r1=740432&r2=740433&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Tue Feb  3 20:41:04 2009
@@ -285,6 +285,7 @@
     <property name="detachedLifespan" type="uint32"  access="RO" unit="second"/>
     <property name="attached"         type="bool"    access="RO"/>
     <property name="expireTime"       type="absTime" access="RO" optional="y"/>
+    <property name="maxClientRate"    type="uint32"  access="RO" unit="msgs/sec" optional="y"/>
 
     <statistic name="framesOutstanding" type="count32"/>
 
@@ -293,6 +294,8 @@
     <statistic name="TxnRejects"   type="count64"  unit="transaction" desc="Total transactions
rejected"/>
     <statistic name="TxnCount"     type="count32"  unit="transaction" desc="Current pending
transactions"/>
 
+    <statistic name="clientCredit" type="count32" unit="message" desc="Client message
credit"/>
+
     <method name="solicitAck"/>
     <method name="detach"/>
     <method name="resetLifespan"/>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message