qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1065724 - in /qpid/branches/qpid-2935/qpid/cpp/src: qpid/broker/QueuePolicy.cpp qpid/broker/QueuePolicy.h tests/QueuePolicyTest.cpp
Date Mon, 31 Jan 2011 19:35:55 GMT
Author: kgiusti
Date: Mon Jan 31 19:35:54 2011
New Revision: 1065724

URL: http://svn.apache.org/viewvc?rev=1065724&view=rev
Log:
remove changes to QueuePolicy in lieu of QueueFlowLimits

Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.h
    qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=1065724&r1=1065723&r2=1065724&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Mon Jan 31 19:35:54 2011
@@ -29,114 +29,33 @@
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-namespace {
-    /** ensure that the configured flow control stop and resume values are
-     * valid with respect to the maximum queue capacity, and each other
-     */
-    template <typename T>
-    void validateFlowConfig(T max, T& stop, T& resume, const std::string& type,
const std::string& queue)
-    {
-        if (resume > stop) {
-            throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\":
qpid.flow_resume_" << type
-                                                    << "=" << resume
-                                                    << " must be less than qpid.flow_stop_"
<< type
-                                                    << "=" << stop));
-        }
-        if (resume == 0) resume = stop;
-        if (max != 0 && (max < stop)) {
-            throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\":
qpid.flow_stop_" << type
-                                                    << "=" << stop
-                                                    << " must be less than qpid.max_"
<< type
-                                                    << "=" << max));
-        }
-    }
-
-    /** extract a capacity value as passing in an argument map
-     */
-    uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t
defaultValue)
-    {
-        FieldTable::ValuePtr v = settings.get(key);
-
-        int64_t result = 0;
-
-        if (!v) return defaultValue;
-        if (v->getType() == 0x23) {
-            QPID_LOG(debug, "Value for " << key << " specified as float: " <<
v->get<float>());
-        } else if (v->getType() == 0x33) {
-            QPID_LOG(debug, "Value for " << key << " specified as double: " <<
v->get<double>());
-        } else if (v->convertsTo<int64_t>()) {
-            result = v->get<int64_t>();
-            QPID_LOG(debug, "Got integer value for " << key << ": " <<
result);
-            if (result >= 0) return result;
-        } else if (v->convertsTo<string>()) {
-            string s(v->get<string>());
-            QPID_LOG(debug, "Got string value for " << key << ": " << s);
-            std::istringstream convert(s);
-            if (convert >> result && result >= 0) return result;
-        }
-
-        QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer,
using default (" << defaultValue << ")");
-        return defaultValue;
-    }
-
-
-}
-
-QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize,
const std::string& _type,
-                         uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize,
 uint64_t _flowResumeSize)
-    : maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false),
-      flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
-      flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
-      flowStopped(false), name(_name)
-{
-    validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", name );
-    validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", name );
-    QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" <<
type << "; maxCount=" << maxCount << "; maxSize=" << maxSize
-             << "; flowStopCount=" << flowStopCount << "; flowResumeCount="
<< flowResumeCount
-             << "; flowStopSize=" << flowStopSize << "; flowResumeSize="
<< flowResumeSize );
+QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize,
const std::string& _type) : 
+    maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false),
name(_name) {
+    QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" <<
type << "; maxCount=" << maxCount << "; maxSize=" << maxSize);
 }
 
 void QueuePolicy::enqueued(uint64_t _size)
 {
-    if (maxCount || flowStopCount) {
-        ++count;
-        if (flowStopCount && !flowStopped && count > flowStopCount) {
-            flowStopped = true;
-            QPID_LOG(info, "Queue \"" << name << "\": has reached " <<
flowStopCount << " enqueued messages. Producer flow control activated." );
-        }
-    }
-
-    if (maxSize  || flowStopSize) {
-        size += _size;
-        if (flowStopSize && !flowStopped && size > flowStopSize) {
-            flowStopped = true;
-            QPID_LOG(info, "Queue \"" << name << "\": has reached " <<
flowStopSize << " enqueued bytes. Producer flow control activated." );
-        }
-    }
+    if (maxCount) ++count;
+    if (maxSize) size += _size;
 }
 
 void QueuePolicy::dequeued(uint64_t _size)
 {
-    if (maxCount || flowStopCount) {
+    if (maxCount) {
         if (count > 0) {
             --count;
         } else {
             throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size
<< "): " << *this));
         }
     }
-    if (maxSize || flowStopSize) {
+    if (maxSize) {
         if (_size > size) {
             throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size
<< "): " << *this));
         } else {
             size -= _size;
         }
     }
-    if (flowStopped &&
-        (flowResumeSize == 0 || size < flowResumeSize) &&
-        (flowResumeCount == 0 || count < flowResumeCount)) {
-        flowStopped = false;
-        QPID_LOG(info, "Queue \"" << name << "\": has drained below the flow
control resume level. Producer flow control deactivated." );
-    }
 }
 
 bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
@@ -197,6 +116,32 @@ void QueuePolicy::update(FieldTable& set
     settings.setString(typeKey, type);
 }
 
+uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string&
key, uint32_t defaultValue)
+{
+    FieldTable::ValuePtr v = settings.get(key);
+
+    int32_t result = 0;
+
+    if (!v) return defaultValue;
+    if (v->getType() == 0x23) {
+        QPID_LOG(debug, "Value for " << key << " specified as float: " <<
v->get<float>());
+    } else if (v->getType() == 0x33) {
+        QPID_LOG(debug, "Value for " << key << " specified as double: " <<
v->get<double>());
+    } else if (v->convertsTo<int>()) {
+        result = v->get<int>();
+        QPID_LOG(debug, "Got integer value for " << key << ": " << result);
+        if (result >= 0) return result;
+    } else if (v->convertsTo<string>()) {
+        string s(v->get<string>());
+        QPID_LOG(debug, "Got string value for " << key << ": " << s);
+        std::istringstream convert(s);
+        if (convert >> result && result >= 0) return result;
+    }
+
+    QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using
default (" << defaultValue << ")");
+    return defaultValue;
+}
+
 std::string QueuePolicy::getType(const FieldTable& settings)
 {
     FieldTable::ValuePtr v = settings.get(typeKey);
@@ -224,10 +169,6 @@ void QueuePolicy::encode(Buffer& buffer)
   buffer.putLongLong(maxSize);
   buffer.putLong(count);
   buffer.putLongLong(size);
-  buffer.putLong(flowStopCount);
-  buffer.putLong(flowResumeCount);
-  buffer.putLongLong(flowStopSize);
-  buffer.putLongLong(flowResumeSize);
 }
 
 void QueuePolicy::decode ( Buffer& buffer ) 
@@ -236,10 +177,6 @@ void QueuePolicy::decode ( Buffer& buffe
   maxSize  = buffer.getLongLong();
   count    = buffer.getLong();
   size     = buffer.getLongLong();
-  flowStopCount   = buffer.getLong();
-  flowResumeCount = buffer.getLong();
-  flowStopSize    = buffer.getLongLong();
-  flowResumeSize  = buffer.getLongLong();
 }
 
 
@@ -247,11 +184,7 @@ uint32_t QueuePolicy::encodedSize() cons
   return sizeof(uint32_t) +  // maxCount
          sizeof(uint64_t) +  // maxSize
          sizeof(uint32_t) +  // count
-         sizeof(uint64_t) +  // size
-         sizeof(uint32_t) +  // flowStopCount
-         sizeof(uint32_t) +  // flowResumecount
-         sizeof(uint64_t) +  // flowStopSize
-         sizeof(uint64_t);   // flowResumeSize
+         sizeof(uint64_t);   // size
 }
 
 
@@ -259,21 +192,14 @@ uint32_t QueuePolicy::encodedSize() cons
 const std::string QueuePolicy::maxCountKey("qpid.max_count");
 const std::string QueuePolicy::maxSizeKey("qpid.max_size");
 const std::string QueuePolicy::typeKey("qpid.policy_type");
-const std::string QueuePolicy::flowStopCountKey("qpid.flow_stop_count");
-const std::string QueuePolicy::flowResumeCountKey("qpid.flow_resume_count");
-const std::string QueuePolicy::flowStopSizeKey("qpid.flow_stop_size");
-const std::string QueuePolicy::flowResumeSizeKey("qpid.flow_resume_size");
 const std::string QueuePolicy::REJECT("reject");
 const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk");
 const std::string QueuePolicy::RING("ring");
 const std::string QueuePolicy::RING_STRICT("ring_strict");
 uint64_t QueuePolicy::defaultMaxSize(0);
 
-FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t
_maxSize,
-                                   uint32_t _flowStopCount, uint32_t _flowResumeCount,
-                                   uint64_t _flowStopSize,  uint64_t _flowResumeSize)
-    : QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK,
-                  _flowStopCount, _flowResumeCount, _flowStopSize, _flowResumeSize) {}
+FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t
_maxSize) : 
+    QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {}
 
 bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
 {
@@ -282,11 +208,8 @@ bool FlowToDiskPolicy::checkLimit(boost:
 }
 
 RingQueuePolicy::RingQueuePolicy(const std::string& _name, 
-                                 uint32_t _maxCount, uint64_t _maxSize, const std::string&
_type,
-                                 uint32_t _flowStopCount, uint32_t _flowResumeCount,
-                                 uint64_t _flowStopSize,  uint64_t _flowResumeSize)
-    : QueuePolicy(_name, _maxCount, _maxSize, _type, _flowStopCount, _flowResumeCount,
-                  _flowStopSize, _flowResumeSize), strict(_type == RING_STRICT) {}
+                                 uint32_t _maxCount, uint64_t _maxSize, const std::string&
_type) : 
+    QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
 
 bool before(const QueuedMessage& a, const QueuedMessage& b)
 {
@@ -394,34 +317,23 @@ std::auto_ptr<QueuePolicy> QueuePolicy::
 std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name,
const qpid::framing::FieldTable& settings)
 {
     uint32_t maxCount = getCapacity(settings, maxCountKey, 0);
-    uint64_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
-    uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
-    uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
-    uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
-    uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
-
-    if (maxCount || maxSize || flowStopCount || flowResumeCount || flowStopSize || flowResumeSize)
{
-        return createQueuePolicy(name, maxCount, maxSize, getType(settings),
-                                 flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
+    uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
+    if (maxCount || maxSize) {
+        return createQueuePolicy(name, maxCount, maxSize, getType(settings));
     } else {
         return std::auto_ptr<QueuePolicy>();
     }
 }
 
 std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name,

-                                                          uint32_t maxCount, uint64_t maxSize,
const std::string& type,
-                                                          uint32_t flowStopCount, uint32_t
flowResumeCount,
-                                                          uint64_t flowStopSize, uint64_t
flowResumeSize)
+                                                          uint32_t maxCount, uint64_t maxSize,
const std::string& type)
 {
     if (type == RING || type == RING_STRICT) {
-        return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize,
type,
-                                                              flowStopCount, flowResumeCount,
flowStopSize, flowResumeSize));
+        return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize,
type));
     } else if (type == FLOW_TO_DISK) {
-        return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize,
-                                                               flowStopCount, flowResumeCount,
flowStopSize, flowResumeSize));
+        return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize));
     } else {
-        return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize,
type,
-                                                          flowStopCount, flowResumeCount,
flowStopSize, flowResumeSize));
+        return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize,
type));
     }
 
 }
@@ -437,8 +349,6 @@ std::ostream& operator<<(std::ostream& o
     if (p.maxCount) out << "count: max=" << p.maxCount << ", current="
<< p.count;
     else out << "count: unlimited";    
     out << "; type=" << p.type;
-    if (p.flowStopCount) out << "; flowStopCount=" << p.flowStopCount <<
", flowResumeCount=" << p.flowResumeCount;
-    if (p.flowStopSize) out << ";  flowStopSize=" << p.flowStopSize <<
", flowResumeSize=" << p.flowResumeSize;
     return out;
 }
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=1065724&r1=1065723&r2=1065724&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.h Mon Jan 31 19:35:54 2011
@@ -43,20 +43,8 @@ class QueuePolicy
     uint32_t count;
     uint64_t size;
     bool policyExceeded;
-
-    /**
-     * Producer flow control: when level is > flowStop*, flow control is ON.
-     * then level is < flowResume*, flow control is OFF.  If == 0, flow control
-     * is not used.  If both byte and msg count thresholds are set, then
-     * passing _either_ level may turn flow control ON, but _both_ must be
-     * below level before flow control will be turned OFF.
-     */
-    uint32_t flowStopCount;
-    uint32_t flowResumeCount;
-    uint64_t flowStopSize;
-    uint64_t flowResumeSize;
-    bool flowStopped;   // true = producers held in flow control
-
+            
+    static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string&
key, uint32_t defaultValue);
 
   protected:
     uint64_t getCurrentQueueSize() const { return size; } 
@@ -66,16 +54,10 @@ class QueuePolicy
     static QPID_BROKER_EXTERN const std::string maxCountKey;
     static QPID_BROKER_EXTERN const std::string maxSizeKey;
     static QPID_BROKER_EXTERN const std::string typeKey;
-    static QPID_BROKER_EXTERN const std::string flowStopCountKey;
-    static QPID_BROKER_EXTERN const std::string flowResumeCountKey;
-    static QPID_BROKER_EXTERN const std::string flowStopSizeKey;
-    static QPID_BROKER_EXTERN const std::string flowResumeSizeKey;
-
-    // Policy types:
     static QPID_BROKER_EXTERN const std::string REJECT;
     static QPID_BROKER_EXTERN const std::string FLOW_TO_DISK;
     static QPID_BROKER_EXTERN const std::string RING;
-    static QPID_BROKER_EXTERN const std::string RING_STRICT;
+    static QPID_BROKER_EXTERN const std::string RING_STRICT;            
 
     virtual ~QueuePolicy() {}
     QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg);
@@ -86,22 +68,14 @@ class QueuePolicy
     virtual bool isEnqueued(const QueuedMessage&);
     QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings);
     uint32_t getMaxCount() const { return maxCount; }
-    uint64_t getMaxSize() const { return maxSize; }
-    uint32_t getFlowStopCount() const { return flowStopCount; }
-    uint32_t getFlowResumeCount() const { return flowResumeCount; }
-    uint64_t getFlowStopSize() const { return flowStopSize; }
-    uint64_t getFlowResumeSize() const { return flowResumeSize; }
-    bool isFlowControlActive() const { return flowStopped; }
-    bool monitorFlowControl() const { return flowStopCount || flowStopSize; }
+    uint64_t getMaxSize() const { return maxSize; }           
     void encode(framing::Buffer& buffer) const;
     void decode ( framing::Buffer& buffer );
     uint32_t encodedSize() const;
     virtual void getPendingDequeues(Messages& result);
 
     static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string&
name, const qpid::framing::FieldTable& settings);
-    static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string&
name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT,
-                                                                           uint32_t flowStopCount
= 0, uint32_t flowResumeCount = 0,
-                                                                           uint64_t flowStopSize
= 0, uint64_t flowResumeSize = 0);
+    static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string&
name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
     static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable&
settings);
     static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t
maxCount, uint64_t maxSize, const std::string& type = REJECT);
     static std::string getType(const qpid::framing::FieldTable& settings);
@@ -111,9 +85,7 @@ class QueuePolicy
   protected:
     const std::string name;
 
-    QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string&
type = REJECT,
-                uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0,
-                uint64_t flowStopSize = 0,  uint64_t flowResumeSize = 0);
+    QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string&
type = REJECT);
 
     virtual bool checkLimit(boost::intrusive_ptr<Message> msg);
     void enqueued(uint64_t size);
@@ -124,18 +96,14 @@ class QueuePolicy
 class FlowToDiskPolicy : public QueuePolicy
 {
   public:
-    FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize,
-                     uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0,
-                     uint64_t flowStopSize = 0,  uint64_t flowResumeSize = 0);
+    FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize);
     bool checkLimit(boost::intrusive_ptr<Message> msg);
 };
 
 class RingQueuePolicy : public QueuePolicy
 {
   public:
-    RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const
std::string& type = RING,
-                    uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0,
-                    uint64_t flowStopSize = 0,  uint64_t flowResumeSize = 0);
+    RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const
std::string& type = RING);
     void enqueued(const QueuedMessage&);
     void dequeued(const QueuedMessage&);
     bool isEnqueued(const QueuedMessage&);

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1065724&r1=1065723&r2=1065724&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp Mon Jan 31 19:35:54 2011
@@ -53,8 +53,6 @@ QPID_AUTO_TEST_CASE(testCount)
     BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize());
     BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount());
 
-    BOOST_CHECK(!policy->monitorFlowControl());
-
     QueuedMessage msg = createMessage(10);
     for (size_t i = 0; i < 5; i++) {
         policy->tryEnqueue(msg.payload);
@@ -398,213 +396,6 @@ QPID_AUTO_TEST_CASE(testCapacityConversi
     } catch (const ResourceLimitExceededException&) {}
 }
 
-
-QPID_AUTO_TEST_CASE(testFlowCount)
-{
-    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 10, 0,
QueuePolicy::REJECT,
-                                                                     7, // flowStop
-                                                                     5));   // flowResume
-    BOOST_CHECK_EQUAL((uint32_t) 7, policy->getFlowStopCount());
-    BOOST_CHECK_EQUAL((uint32_t) 5, policy->getFlowResumeCount());
-    BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowStopSize());
-    BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowResumeSize());
-    BOOST_CHECK(!policy->isFlowControlActive());
-    BOOST_CHECK(policy->monitorFlowControl());
-
-    QueuedMessage msg = createMessage(10);
-    for (size_t i = 0; i < 6; i++) {
-        policy->tryEnqueue(msg.payload);
-        BOOST_CHECK(!policy->isFlowControlActive());
-    }
-    BOOST_CHECK(!policy->isFlowControlActive());  // 6 on queue
-    policy->tryEnqueue(msg.payload);
-    BOOST_CHECK(!policy->isFlowControlActive());  // 7 on queue
-
-    policy->tryEnqueue(msg.payload);
-    BOOST_CHECK(policy->isFlowControlActive());   // 8 on queue, ON
-    policy->tryEnqueue(msg.payload);
-    BOOST_CHECK(policy->isFlowControlActive());   // 9 on queue
-
-    policy->dequeued(msg);
-    BOOST_CHECK(policy->isFlowControlActive());   // 8 on queue
-    policy->dequeued(msg);
-    BOOST_CHECK(policy->isFlowControlActive());   // 7 on queue
-    policy->dequeued(msg);
-    BOOST_CHECK(policy->isFlowControlActive());   // 6 on queue
-    policy->dequeued(msg);
-    BOOST_CHECK(policy->isFlowControlActive());   // 5 on queue
-
-    policy->dequeued(msg);
-    BOOST_CHECK(!policy->isFlowControlActive());  // 4 on queue, OFF
-}
-
-
-QPID_AUTO_TEST_CASE(testFlowSize)
-{
-    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 10, 0,
QueuePolicy::REJECT,
-                                                                     0, 0,     // flow-Count
-                                                                     70,    // flowStopSize
-                                                                     50));  // flowResumeSize
-    BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowStopCount());
-    BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowResumeCount());
-    BOOST_CHECK_EQUAL((uint32_t) 70, policy->getFlowStopSize());
-    BOOST_CHECK_EQUAL((uint32_t) 50, policy->getFlowResumeSize());
-    BOOST_CHECK(!policy->isFlowControlActive());
-    BOOST_CHECK(policy->monitorFlowControl());
-
-    QueuedMessage msg = createMessage(10);
-    for (size_t i = 0; i < 6; i++) {
-        policy->tryEnqueue(msg.payload);
-        BOOST_CHECK(!policy->isFlowControlActive());
-    }
-    BOOST_CHECK(!policy->isFlowControlActive());  // 60 on queue
-    policy->tryEnqueue(msg.payload);
-    BOOST_CHECK(!policy->isFlowControlActive());  // 70 on queue
-
-    QueuedMessage tinyMsg = createMessage(1);
-    policy->tryEnqueue(tinyMsg.payload);
-    BOOST_CHECK(policy->isFlowControlActive());   // 71 on queue, ON
-    policy->tryEnqueue(msg.payload);
-    BOOST_CHECK(policy->isFlowControlActive());   // 81 on queue
-
-    policy->dequeued(msg);
-    BOOST_CHECK(policy->isFlowControlActive());   // 71 on queue
-    policy->dequeued(msg);
-    BOOST_CHECK(policy->isFlowControlActive());   // 61 on queue
-    policy->dequeued(msg);
-    BOOST_CHECK(policy->isFlowControlActive());   // 51 on queue
-
-    policy->dequeued(tinyMsg);
-    BOOST_CHECK(policy->isFlowControlActive());   // 50 on queue
-    policy->dequeued(tinyMsg);
-    BOOST_CHECK(!policy->isFlowControlActive());  // 49 on queue, OFF
-    policy->dequeued(msg);
-    BOOST_CHECK(!policy->isFlowControlActive());  // 39 on queue
-}
-
-QPID_AUTO_TEST_CASE(testFlowArgs)
-{
-    FieldTable args;
-    const uint64_t stop(0x2FFFFFFFF);
-    const uint64_t resume(0x1FFFFFFFF);
-    args.setInt(QueuePolicy::flowStopCountKey, 30);
-    args.setInt(QueuePolicy::flowResumeCountKey, 21);
-    args.setUInt64(QueuePolicy::flowStopSizeKey, stop);
-    args.setUInt64(QueuePolicy::flowResumeSizeKey, resume);
-    args.setUInt64(QueuePolicy::maxSizeKey, stop + 1);      // needed to pass stop < max
validation
-
-    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", args));
-
-    BOOST_CHECK_EQUAL((uint32_t) 30, policy->getFlowStopCount());
-    BOOST_CHECK_EQUAL((uint32_t) 21, policy->getFlowResumeCount());
-    BOOST_CHECK_EQUAL(stop, policy->getFlowStopSize());
-    BOOST_CHECK_EQUAL(resume, policy->getFlowResumeSize());
-    BOOST_CHECK(!policy->isFlowControlActive());
-    BOOST_CHECK(policy->monitorFlowControl());
-}
-
-
-QPID_AUTO_TEST_CASE(testFlowCombo)
-{
-    FieldTable args;
-    args.setInt(QueuePolicy::flowStopCountKey, 10);
-    args.setInt(QueuePolicy::flowResumeCountKey, 5);
-    args.setUInt64(QueuePolicy::flowStopSizeKey, 200);
-    args.setUInt64(QueuePolicy::flowResumeSizeKey, 100);
-
-    QueuedMessage msg_1 = createMessage(1);
-    QueuedMessage msg_10 = createMessage(10);
-    QueuedMessage msg_50 = createMessage(50);
-    QueuedMessage msg_100 = createMessage(100);
-
-    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", args));
-    BOOST_CHECK(!policy->isFlowControlActive());        // count:0  size:0
-
-    // verify flow control comes ON when only count passes its stop point.
-
-    for (size_t i = 0; i < 10; i++) {
-        policy->tryEnqueue(msg_10.payload);
-        BOOST_CHECK(!policy->isFlowControlActive());
-    }
-    // count:10 size:100
-
-    policy->tryEnqueue(msg_1.payload);  // count:11 size: 101  ->ON
-    BOOST_CHECK(policy->isFlowControlActive());
-
-    for (size_t i = 0; i < 6; i++) {
-        policy->dequeued(msg_10);
-        BOOST_CHECK(policy->isFlowControlActive());
-    }
-    // count:5 size: 41
-
-    policy->dequeued(msg_1);        // count: 4 size: 40  ->OFF
-    BOOST_CHECK(!policy->isFlowControlActive());
-
-    for (size_t i = 0; i < 4; i++) {
-        policy->dequeued(msg_10);
-        BOOST_CHECK(!policy->isFlowControlActive());
-    }
-    // count:0 size:0
-
-    // verify flow control comes ON when only size passes its stop point.
-
-    policy->tryEnqueue(msg_100.payload);  // count:1 size: 100
-    BOOST_CHECK(!policy->isFlowControlActive());
-
-    policy->tryEnqueue(msg_50.payload);   // count:2 size: 150
-    BOOST_CHECK(!policy->isFlowControlActive());
-
-    policy->tryEnqueue(msg_50.payload);   // count:3 size: 200
-    BOOST_CHECK(!policy->isFlowControlActive());
-
-    policy->tryEnqueue(msg_1.payload);   // count:4 size: 201  ->ON
-    BOOST_CHECK(policy->isFlowControlActive());
-
-    policy->dequeued(msg_100);              // count:3 size:101
-    BOOST_CHECK(policy->isFlowControlActive());
-
-    policy->dequeued(msg_1);                // count:2 size:100
-    BOOST_CHECK(policy->isFlowControlActive());
-
-    policy->dequeued(msg_50);               // count:1 size:50  ->OFF
-    BOOST_CHECK(!policy->isFlowControlActive());
-
-    // verify flow control remains ON until both thresholds drop below their
-    // resume point.
-
-    for (size_t i = 0; i < 8; i++) {
-        policy->tryEnqueue(msg_10.payload);
-        BOOST_CHECK(!policy->isFlowControlActive());
-    }
-    // count:9 size:130
-
-    policy->tryEnqueue(msg_10.payload);   // count:10 size: 140
-    BOOST_CHECK(!policy->isFlowControlActive());
-
-    policy->tryEnqueue(msg_1.payload);   // count:11 size: 141  ->ON
-    BOOST_CHECK(policy->isFlowControlActive());
-
-    policy->tryEnqueue(msg_100.payload);   // count:12 size: 241  (both thresholds crossed)
-    BOOST_CHECK(policy->isFlowControlActive());
-
-    // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241
-
-    policy->dequeued(msg_50);               // count:11 size:191
-    BOOST_CHECK(policy->isFlowControlActive());
-
-    for (size_t i = 0; i < 9; i++) {
-        policy->dequeued(msg_10);
-        BOOST_CHECK(policy->isFlowControlActive());
-    }
-    // count:2 size:101
-    policy->dequeued(msg_1);                // count:1 size:100
-    BOOST_CHECK(policy->isFlowControlActive());   // still active due to size
-
-    policy->dequeued(msg_100);               // count:0 size:0  ->OFF
-    BOOST_CHECK(!policy->isFlowControlActive());
-}
-
-
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message