Return-Path: Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: (qmail 18113 invoked from network); 14 Feb 2011 15:06:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 14 Feb 2011 15:06:02 -0000 Received: (qmail 35447 invoked by uid 500); 14 Feb 2011 15:06:02 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 35352 invoked by uid 500); 14 Feb 2011 15:05:59 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 35340 invoked by uid 99); 14 Feb 2011 15:05:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Feb 2011 15:05:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Feb 2011 15:05:56 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8236F2388900; Mon, 14 Feb 2011 15:05:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1070515 - in /qpid/branches/qpid-2935/qpid/cpp/src: qpid/broker/Broker.cpp qpid/broker/Broker.h qpid/broker/QueueFlowLimit.cpp qpid/broker/QueueFlowLimit.h tests/QueueFlowLimitTest.cpp tests/QueuePolicyTest.cpp tests/QueueTest.cpp Date: Mon, 14 Feb 2011 15:05:36 -0000 To: commits@qpid.apache.org From: kgiusti@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110214150536.8236F2388900@eris.apache.org> Author: kgiusti Date: Mon Feb 14 15:05:35 2011 New Revision: 1070515 URL: http://svn.apache.org/viewvc?rev=1070515&view=rev Log: QPID-2935: make flow control enabled by default Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1070515&r1=1070514&r2=1070515&view=diff ============================================================================== --- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp (original) +++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp Mon Feb 14 15:05:35 2011 @@ -31,6 +31,7 @@ #include "qpid/broker/TopicExchange.h" #include "qpid/broker/Link.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/QueueFlowLimit.h" #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" @@ -102,7 +103,9 @@ Broker::Options::Options(const std::stri requireEncrypted(false), maxSessionRate(0), asyncQueueEvents(false), // Must be false in a cluster. - qmf2Support(false) + qmf2Support(false), + queueFlowStopRatio(80), + queueFlowResumeRatio(70) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -134,7 +137,9 @@ Broker::Options::Options(const std::stri ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") ("sasl-config", optValue(saslConfigPath, "FILE"), "gets sasl config from nonstandard location") ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)") - ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication"); + ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") + ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "%MESSAGES"), "Queue capacity level at which flow control is activated.") + ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "%MESSAGES"), "Queue capacity level at which flow control is de-activated."); } const std::string empty; @@ -219,6 +224,7 @@ Broker::Broker(const Broker::Options& co } QueuePolicy::setDefaultMaxSize(conf.queueLimit); + QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); queues.setQueueEvents(&queueEvents); // Early-Initialize plugins Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h?rev=1070515&r1=1070514&r2=1070515&view=diff ============================================================================== --- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h (original) +++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h Mon Feb 14 15:05:35 2011 @@ -115,6 +115,8 @@ public: uint32_t maxSessionRate; bool asyncQueueEvents; bool qmf2Support; + uint queueFlowStopRatio; // producer flow control: on + uint queueFlowResumeRatio; // producer flow control: off private: std::string getHome(); Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1070515&r1=1070514&r2=1070515&view=diff ============================================================================== --- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original) +++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Mon Feb 14 15:05:35 2011 @@ -219,7 +219,7 @@ void QueueFlowLimit::encode(Buffer& buff } -void QueueFlowLimit::decode ( Buffer& buffer ) +void QueueFlowLimit::decode ( Buffer& buffer ) { flowStopCount = buffer.getLong(); flowResumeCount = buffer.getLong(); @@ -244,21 +244,58 @@ const std::string QueueFlowLimit::flowSt const std::string QueueFlowLimit::flowResumeCountKey("qpid.flow_resume_count"); const std::string QueueFlowLimit::flowStopSizeKey("qpid.flow_stop_size"); const std::string QueueFlowLimit::flowResumeSizeKey("qpid.flow_resume_size"); +uint64_t QueueFlowLimit::defaultMaxSize; +uint QueueFlowLimit::defaultFlowStopRatio; +uint QueueFlowLimit::defaultFlowResumeRatio; + + +void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint flowResumeRatio) +{ + defaultMaxSize = maxQueueSize; + defaultFlowStopRatio = flowStopRatio; + defaultFlowResumeRatio = flowResumeRatio; + + /** @todo Verify valid range on Broker::Options instead of here */ + if (flowStopRatio > 100 || flowResumeRatio > 100) + throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:" + << " flowStopRatio=" << flowStopRatio + << " flowResumeRatio=" << flowResumeRatio)); + if (flowResumeRatio > flowStopRatio) + throw InvalidArgumentException(QPID_MSG("Default queue flow stop ratio must be >= flow resume ratio:" + << " flowStopRatio=" << flowStopRatio + << " flowResumeRatio=" << flowResumeRatio)); +} std::auto_ptr QueueFlowLimit::createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings) { - uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0); - uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0); - uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0); - uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0); + std::string type(QueuePolicy::getType(settings)); - if (flowStopCount || flowResumeCount || flowStopSize || flowResumeSize) { + if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) { + // The size of a RING queue is limited by design - no need for flow control. + return std::auto_ptr(); + } + + if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey)) { + uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0); + uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0); + uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0); + uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0); + if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control + return std::auto_ptr(); + } return std::auto_ptr(new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); - } else { - return std::auto_ptr(); } + + if (defaultFlowStopRatio) { + uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize); + uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); + uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); + + return std::auto_ptr(new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize)); + } + return std::auto_ptr(); } Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1070515&r1=1070514&r2=1070515&view=diff ============================================================================== --- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original) +++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Mon Feb 14 15:05:35 2011 @@ -52,6 +52,10 @@ namespace broker { */ class QueueFlowLimit { + static uint64_t defaultMaxSize; + static uint defaultFlowStopRatio; + static uint defaultFlowResumeRatio; + Queue *queue; std::string queueName; @@ -92,6 +96,8 @@ class QueueFlowLimit uint32_t encodedSize() const; static QPID_BROKER_EXTERN std::auto_ptr createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings); + static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio); + friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&); protected: Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1070515&r1=1070514&r2=1070515&view=diff ============================================================================== --- qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original) +++ qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Mon Feb 14 15:05:35 2011 @@ -23,6 +23,7 @@ #include "unit_test.h" #include "test_tools.h" +#include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/sys/Time.h" #include "qpid/framing/reply_exceptions.h" @@ -302,6 +303,106 @@ QPID_AUTO_TEST_CASE(testFlowCombo) } +QPID_AUTO_TEST_CASE(testFlowDefaultArgs) +{ + QueueFlowLimit::setDefaults(2950001, // max queue byte count + 80, // 80% stop threshold + 70); // 70% resume threshold + FieldTable args; + std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + BOOST_CHECK_EQUAL((uint64_t) 2360001, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint64_t) 2065000, flow->getFlowResumeSize()); + BOOST_CHECK_EQUAL( 0, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL( 0, flow->getFlowResumeCount()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); +} + + +QPID_AUTO_TEST_CASE(testFlowOverrideArgs) +{ + QueueFlowLimit::setDefaults(2950001, // max queue byte count + 80, // 80% stop threshold + 70); // 70% resume threshold + { + FieldTable args; + args.setInt(QueueFlowLimit::flowStopCountKey, 35000); + args.setInt(QueueFlowLimit::flowResumeCountKey, 30000); + std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint64_t) 0, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint64_t) 0, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); + } + { + FieldTable args; + args.setInt(QueueFlowLimit::flowStopSizeKey, 350000); + args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000); + std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint64_t) 350000, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint64_t) 300000, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); + } + { + FieldTable args; + args.setInt(QueueFlowLimit::flowStopCountKey, 35000); + args.setInt(QueueFlowLimit::flowResumeCountKey, 30000); + args.setInt(QueueFlowLimit::flowStopSizeKey, 350000); + args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000); + std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint64_t) 350000, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint64_t) 300000, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); + } +} + + +QPID_AUTO_TEST_CASE(testFlowOverrideDefaults) +{ + QueueFlowLimit::setDefaults(2950001, // max queue byte count + 97, // stop threshold + 73); // resume threshold + FieldTable args; + std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + BOOST_CHECK_EQUAL((uint32_t) 2861501, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint32_t) 2153500, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); +} + + +QPID_AUTO_TEST_CASE(testFlowDisable) +{ + { + FieldTable args; + args.setInt(QueueFlowLimit::flowStopCountKey, 0); + std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + BOOST_CHECK(!flow.get()); + } + { + FieldTable args; + args.setInt(QueueFlowLimit::flowStopSizeKey, 0); + std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + BOOST_CHECK(!flow.get()); + } +} + + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1070515&r1=1070514&r2=1070515&view=diff ============================================================================== --- qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp (original) +++ qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp Mon Feb 14 15:05:35 2011 @@ -23,6 +23,7 @@ #include "test_tools.h" #include "qpid/broker/QueuePolicy.h" +#include "qpid/broker/QueueFlowLimit.h" #include "qpid/client/QueueOptions.h" #include "qpid/sys/Time.h" #include "qpid/framing/reply_exceptions.h" @@ -340,6 +341,8 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNo //fallback to rejecting messages QueueOptions args; args.setSizePolicy(FLOW_TO_DISK, 0, 5); + // Disable flow control, or else we'll never hit the max limit + args.setInt(QueueFlowLimit::flowStopCountKey, 0); ProxySessionFixture f; std::string q("my-queue"); Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp?rev=1070515&r1=1070514&r2=1070515&view=diff ============================================================================== --- qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp (original) +++ qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp Mon Feb 14 15:05:35 2011 @@ -36,6 +36,9 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/broker/QueuePolicy.h" +#include "qpid/broker/QueueFlowLimit.h" + #include #include "boost/format.hpp" @@ -508,6 +511,8 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); + // disable flow control, as this test violates the enqueue/dequeue sequence. + args.setInt(QueueFlowLimit::flowStopCountKey, 0); Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org