qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject svn commit: r740135 - in /qpid/trunk/qpid/cpp/src/qpid: broker/Connection.h broker/ConnectionHandler.cpp broker/ConnectionState.h broker/SessionState.cpp client/ConnectionHandler.cpp
Date Mon, 02 Feb 2009 22:28:17 GMT
Author: astitcher
Date: Mon Feb  2 22:28:17 2009
New Revision: 740135

URL: http://svn.apache.org/viewvc?rev=740135&view=rev
Log:
Send client property indicating that client supports
producer throttling in the Connection.OpenOK message.
Broker only tries to apply flow control to client if it
has received the property in the Connection.OpenOK message.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=740135&r1=740134&r2=740135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Mon Feb  2 22:28:17 2009
@@ -115,7 +115,6 @@
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
 
     ChannelMap channels;
-    //framing::AMQP_ClientProxy::Connection* client;
     ConnectionHandler adapter;
     const bool isLink;
     bool mgmtClosing;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=740135&r1=740134&r2=740135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Mon Feb  2 22:28:17 2009
@@ -45,6 +45,8 @@
 const std::string en_US     = "en_US";
 const std::string QPID_FED_LINK = "qpid.fed_link";
 const std::string QPID_FED_TAG  = "qpid.federation_tag";
+const std::string SESSION_FLOW_CONTROL("qpid.session_flow");
+const int SESSION_FLOW_CONTROL_VER = 1;
 }
 
 void ConnectionHandler::close(connection::CloseCode code, const string& text)
@@ -139,6 +141,9 @@
         }
         QPID_LOG(info, "Connection is a federation link");
     }
+    if ( clientProperties.getAsInt(SESSION_FLOW_CONTROL) == SESSION_FLOW_CONTROL_VER ) {
+        connection.setClientThrottling();
+    }
 }
 
 void ConnectionHandler::Handler::secureOk(const string& response)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=740135&r1=740134&r2=740135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Mon Feb  2 22:28:17 2009
@@ -47,7 +47,8 @@
         heartbeat(0),
         heartbeatmax(120),
         stagingThreshold(broker.getStagingThreshold()),
-        federationLink(true)
+        federationLink(true),
+        clientSupportsThrottling(false)
         {}
 
     virtual ~ConnectionState () {}
@@ -73,6 +74,9 @@
     void setFederationPeerTag(const string& tag) { federationPeerTag = string(tag); }
     const string& getFederationPeerTag() const { return federationPeerTag; }
     std::vector<Url>& getKnownHosts() { return knownHosts; }
+    
+    void setClientThrottling() { clientSupportsThrottling = true; }
+    bool getClientThrottling() const { return clientSupportsThrottling; }
 
     Broker& getBroker() { return broker; }
 
@@ -98,6 +102,7 @@
     bool federationLink;
     string federationPeerTag;
     std::vector<Url> knownHosts;
+    bool clientSupportsThrottling;
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=740135&r1=740134&r2=740135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Feb  2 22:28:17 2009
@@ -77,8 +77,12 @@
         }
     }
     uint32_t maxRate = broker.getOptions().maxSessionRate;
-    if (maxRate) {
-        rateFlowcontrol = new RateFlowcontrol(maxRate);
+    if (maxRate) { 
+        if (handler->getConnection().getClientThrottling()) {
+            rateFlowcontrol = new RateFlowcontrol(maxRate);
+        } else {
+            QPID_LOG(warning, getId() << ": Unable to flow control client - client
doesn't support");
+        }
     }
     attach(h);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=740135&r1=740134&r2=740135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Mon Feb  2 22:28:17 2009
@@ -51,6 +51,8 @@
 const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state");
 const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state");
 
+const std::string SESSION_FLOW_CONTROL("qpid.session_flow");
+const int SESSION_FLOW_CONTROL_VER = 1;
 }
 
 CloseCode ConnectionHandler::convert(uint16_t replyCode)
@@ -76,6 +78,8 @@
 
     FINISHED.insert(FAILED);
     FINISHED.insert(CLOSED);
+    
+    properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER);
 }
 
 void ConnectionHandler::incoming(AMQFrame& frame)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message