qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r724857 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h qpid/cluster/ClusterPlugin.cpp qpid/cluster/Connection.cpp tests/latencytest.cpp
Date Tue, 09 Dec 2008 20:08:48 GMT
Author: aconway
Date: Tue Dec  9 12:08:47 2008
New Revision: 724857

URL: http://svn.apache.org/viewvc?rev=724857&view=rev
Log:
Cluster: Option --cluster-read-max configures read-to-redeliver flow-control.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=724857&r1=724856&r2=724857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Dec  9 12:08:47 2008
@@ -85,7 +85,7 @@
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
 };
 
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b,
bool useQuorum) :
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b,
bool quorum_, size_t readMax_) :
     broker(b),
     poller(b.getPoller()),
     cpg(*this),
@@ -104,7 +104,8 @@
     mgmtObject(0),
     state(INIT),
     lastSize(0),
-    lastBroker(false)
+    lastBroker(false),
+    readMax(readMax_)
 {
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent != 0){
@@ -119,7 +120,7 @@
     deliverQueue.start();
     mcastQueue.start();
     QPID_LOG(notice, *this << " joining cluster " << name << " with url="
<< myUrl);
-    if (useQuorum) quorum.init();
+    if (quorum_) quorum.init();
     cpg.join(name);
     broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last
for exception safety.
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=724857&r1=724856&r2=724857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Dec  9 12:08:47 2008
@@ -64,11 +64,9 @@
     typedef std::vector<ConnectionPtr> Connections;
     
     /**
-     * Join a cluster.
-     * @param name of the cluster.
-     * @param url of this broker, sent to the cluster.
+     * Join a cluster. 
      */
-    Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum);
+    Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
size_t readMax);
 
     virtual ~Cluster();
 
@@ -95,6 +93,8 @@
 
     boost::function<bool ()> isQuorate;
     void checkQuorum();
+
+    size_t getReadMax() { return readMax; }
     
   private:
     typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
@@ -215,6 +215,7 @@
     boost::shared_ptr<FailoverExchange> failoverExchange;
 
     Quorum quorum;
+    size_t readMax;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
   friend class ClusterDispatcher;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=724857&r1=724856&r2=724857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue Dec  9 12:08:47 2008
@@ -41,8 +41,10 @@
     string name;
     string url;
     bool quorum;
+    size_t readMax;
 
-    ClusterValues() : quorum(false) {}
+    // FIXME aconway 2008-12-09: revisit default.
+    ClusterValues() : quorum(false), readMax(4) {}
   
     Url getUrl(uint16_t port) const {
         if (url.empty()) return Url::getIpAddressesUrl(port);
@@ -66,6 +68,7 @@
 #if HAVE_LIBCMAN
             ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN)
cluster.")
 #endif
+            ("cluster-read-max", optValue(values.readMax,"N"), "Max un-delivered reads per
client connection, 0 means unlimited.")
             ;
     }
 };
@@ -85,7 +88,7 @@
         if (values.name.empty()) return; // Only if --cluster-name option was specified.
         Broker* broker = dynamic_cast<Broker*>(&target);
         if (!broker) return;
-        cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)),
*broker, values.quorum);
+        cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)),
*broker, values.quorum, values.readMax);
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=724857&r1=724856&r2=724857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Dec  9 12:08:47 2008
@@ -74,8 +74,7 @@
 void Connection::init() {
     QPID_LOG(debug, cluster << " new connection: " << *this);
     if (isLocal() && !isCatchUp()) {
-        // FIXME aconway 2008-12-05: configurable credit limit
-        output.giveReadCredit(10);
+        output.giveReadCredit(cluster.getReadMax());
     }
 }
 
@@ -204,7 +203,8 @@
     ++deliverSeq;
     while (mcastDecoder.decode(buf))
         delivered(mcastDecoder.frame);
-    output.giveReadCredit(1);
+    if  (cluster.getReadMax())
+        output.giveReadCredit(1);
 }
 
 broker::SessionState& Connection::sessionState() {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp?rev=724857&r1=724856&r2=724857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp Tue Dec  9 12:08:47 2008
@@ -56,7 +56,7 @@
     bool durable;
     string base;
 
-    Args() : size(256), count(1000), rate(0), reportFrequency(100),
+    Args() : size(256), count(1000), rate(0), reportFrequency(1000),
     	     timeLimit(0), queues(1), 
              prefetch(100), ack(0),
              durable(false), base("latency-test")



Mime
View raw message