qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1065700 - in /qpid/branches/qpid-2935/qpid/cpp/src: Makefile.am qpid/broker/Queue.cpp qpid/broker/Queue.h qpid/broker/QueueFlowLimit.cpp qpid/broker/QueueFlowLimit.h tests/Makefile.am tests/QueueFlowLimitTest.cpp
Date Mon, 31 Jan 2011 18:09:52 GMT
Author: kgiusti
Date: Mon Jan 31 18:09:51 2011
New Revision: 1065700

URL: http://svn.apache.org/viewvc?rev=1065700&view=rev
Log:
QPID-2935: add per-queue flow limits

Added:
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
    qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am

Modified: qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am?rev=1065700&r1=1065699&r2=1065700&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am Mon Jan 31 18:09:51 2011
@@ -588,6 +588,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/QueueRegistry.cpp \
   qpid/broker/QueueRegistry.h \
   qpid/broker/QueuedMessage.h \
+  qpid/broker/QueueFlowLimit.h \
+  qpid/broker/QueueFlowLimit.cpp \
   qpid/broker/RateFlowcontrol.h \
   qpid/broker/RateTracker.cpp \
   qpid/broker/RateTracker.h \

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1065700&r1=1065699&r2=1065700&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp Mon Jan 31 18:09:51 2011
@@ -27,6 +27,7 @@
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/QueueFlowLimit.h"
 
 #include "qpid/StringUtils.h"
 #include "qpid/log/Statement.h"
@@ -166,11 +167,11 @@ void Queue::deliver(boost::intrusive_ptr
 
 void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg)
 {
-    if (policy.get()) policy->recoverEnqueued(msg);
+    if (policy.get()) policy->recoverEnqueued(msg);  // KAG INC COUNTERS
 }
 
 void Queue::recover(boost::intrusive_ptr<Message>& msg){
-    if (policy.get()) policy->recoverEnqueued(msg);
+    if (policy.get()) policy->recoverEnqueued(msg); // KAG INC COUNTERS
 
     push(msg, true);
     if (store){ 
@@ -351,7 +352,7 @@ bool Queue::browseNextMessage(QueuedMess
                 //consumer wants the message
                 c->position = msg.position;
                 m = msg;
-                if (!lastValueQueueNoBrowse) clearLVQIndex(msg);
+                if (!lastValueQueueNoBrowse) clearLVQIndex(msg);    // prevent this msg from
being replaced by LVQ
                 if (lastValueQueue) {
                     boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
                     if (replacement.get()) m.payload = replacement;
@@ -642,8 +643,9 @@ void Queue::push(boost::intrusive_ptr<Me
             else QPID_LOG(warning, "Enqueue manager not set, events not generated for " <<
getName());
         }
         if (policy.get()) {
-            policy->enqueued(qm);
+            policy->enqueued(qm);  // KAG STORE COPY
         }
+        if (flowLimit.get()) flowLimit->consume(qm);
     }
     copy.notify();
 }
@@ -746,7 +748,7 @@ bool Queue::enqueue(TransactionContext* 
         Messages dequeues;
         {
             Mutex::ScopedLock locker(messageLock);
-            policy->tryEnqueue(msg);
+            policy->tryEnqueue(msg);    // KAG INC COUNTERS
             policy->getPendingDequeues(dequeues);
         }
         //depending on policy, may have some dequeues that need to performed without holding
the lock
@@ -784,7 +786,7 @@ bool Queue::enqueue(TransactionContext* 
 void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
 {
     Mutex::ScopedLock locker(messageLock);
-    if (policy.get()) policy->enqueueAborted(msg);       
+    if (policy.get()) policy->enqueueAborted(msg);       // KAG DEC COUNTERS
 }
 
 // return true if store exists, 
@@ -841,7 +843,8 @@ void Queue::popAndDequeue()
  */
 void Queue::dequeued(const QueuedMessage& msg)
 {
-    if (policy.get()) policy->dequeued(msg);
+    if (policy.get()) policy->dequeued(msg);    // KAG REMOVE COPY, DEC COUNTERS
+    if (flowLimit.get()) flowLimit->replenish(msg);
     mgntDeqStats(msg.payload);
     if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
         eventMgr->dequeued(msg);
@@ -903,6 +906,8 @@ void Queue::configure(const FieldTable& 
     FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
     if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
 
+    flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings);
+
     if (mgmtObject != 0)
         mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
 
@@ -1176,9 +1181,10 @@ void Queue::enqueued(const QueuedMessage
 {
     if (m.payload) {
         if (policy.get()) {
-            policy->recoverEnqueued(m.payload);
-            policy->enqueued(m);
+            policy->recoverEnqueued(m.payload); // KAG INC COUNTERS
+            policy->enqueued(m);                // KAG STORE COPY
         }
+        if (flowLimit.get()) flowLimit->consume(m);
         mgntEnqStats(m.payload);
         boost::intrusive_ptr<Message> payload = m.payload;
         enqueue ( 0, payload, true );

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h?rev=1065700&r1=1065699&r2=1065700&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h Mon Jan 31 18:09:51 2011
@@ -56,6 +56,7 @@ class QueueEvents;
 class QueueRegistry;
 class TransactionContext;
 class Exchange;
+class QueueFlowLimit;
 
 /**
  * The brokers representation of an amqp queue. Messages are
@@ -112,6 +113,7 @@ class Queue : public boost::enable_share
     mutable uint64_t persistenceId;
     framing::FieldTable settings;
     std::auto_ptr<QueuePolicy> policy;
+    std::auto_ptr<QueueFlowLimit> flowLimit;
     bool policyExceeded;
     QueueBindings bindings;
     std::string alternateExchangeName;

Added: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1065700&view=auto
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (added)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Mon Jan 31 18:09:51
2011
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Mutex.h"
+#include <sstream>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+namespace {
+    /** ensure that the configured flow control stop and resume values are
+     * valid with respect to the maximum queue capacity, and each other
+     */
+    template <typename T>
+    void validateFlowConfig(T max, T& stop, T& resume, const std::string& type,
const std::string& queue)
+    {
+        if (resume > stop) {
+            throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\":
qpid.flow_resume_" << type
+                                                    << "=" << resume
+                                                    << " must be less than qpid.flow_stop_"
<< type
+                                                    << "=" << stop));
+        }
+        if (resume == 0) resume = stop;
+        if (max != 0 && (max < stop)) {
+            throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\":
qpid.flow_stop_" << type
+                                                    << "=" << stop
+                                                    << " must be less than qpid.max_"
<< type
+                                                    << "=" << max));
+        }
+    }
+
+    /** extract a capacity value as passed in an argument map
+     */
+    uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t
defaultValue)
+    {
+        FieldTable::ValuePtr v = settings.get(key);
+
+        int64_t result = 0;
+
+        if (!v) return defaultValue;
+        if (v->getType() == 0x23) {
+            QPID_LOG(debug, "Value for " << key << " specified as float: " <<
v->get<float>());
+        } else if (v->getType() == 0x33) {
+            QPID_LOG(debug, "Value for " << key << " specified as double: " <<
v->get<double>());
+        } else if (v->convertsTo<int64_t>()) {
+            result = v->get<int64_t>();
+            QPID_LOG(debug, "Got integer value for " << key << ": " <<
result);
+            if (result >= 0) return result;
+        } else if (v->convertsTo<string>()) {
+            string s(v->get<string>());
+            QPID_LOG(debug, "Got string value for " << key << ": " << s);
+            std::istringstream convert(s);
+            if (convert >> result && result >= 0) return result;
+        }
+
+        QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer,
using default (" << defaultValue << ")");
+        return defaultValue;
+    }
+}
+
+
+
+QueueFlowLimit::QueueFlowLimit(Queue *_queue,
+                               uint32_t _flowStopCount, uint32_t _flowResumeCount,
+                               uint64_t _flowStopSize,  uint64_t _flowResumeSize)
+    : queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
+      flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
+      flowStopped(false), count(0), size(0)
+{
+    uint32_t maxCount(0);
+    uint64_t maxSize(0);
+
+    if (_queue) {
+        queueName = _queue->getName();
+        if (_queue->getPolicy()) {
+            maxSize = _queue->getPolicy()->getMaxSize();
+            maxCount = _queue->getPolicy()->getMaxCount();
+        }
+    }
+    validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName );
+    validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName );
+    QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount="
<< flowStopCount
+             << ", flowResumeCount=" << flowResumeCount
+             << ", flowStopSize=" << flowStopSize << ", flowResumeSize="
<< flowResumeSize );
+}
+
+
+
+void QueueFlowLimit::consume(const QueuedMessage& msg)
+{
+    if (!msg.payload) return;
+
+    sys::Mutex::ScopedLock l(pendingFlowLock);
+
+    ++count;
+    size += msg.payload->contentSize();
+
+    if (flowStopCount && !flowStopped && count > flowStopCount) {
+        flowStopped = true;
+        QPID_LOG(info, "Queue \"" << queueName << "\": has reached " <<
flowStopCount << " enqueued messages. Producer flow control activated." );
+    }
+
+    if (flowStopSize && !flowStopped && size > flowStopSize) {
+        flowStopped = true;
+        QPID_LOG(info, "Queue \"" << queueName << "\": has reached " <<
flowStopSize << " enqueued bytes. Producer flow control activated." );
+    }
+
+    // KAG: test
+    if (index.find(msg.payload) != index.end()) {
+        QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice:
" << msg.position);
+    }
+    
+    if (flowStopped || !pendingFlow.empty()) {
+        msg.payload->getReceiveCompletion().startCompleter();    // don't complete until
flow resumes
+        pendingFlow.push_back(msg.payload);
+        index.insert(msg.payload);
+    }
+}
+
+
+
+void QueueFlowLimit::replenish(const QueuedMessage& msg)
+{
+    if (!msg.payload) return;
+
+    sys::Mutex::ScopedLock l(pendingFlowLock);
+
+    if (count > 0) {
+        --count;
+    } else {
+        throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" <<
queueName));
+    }
+
+    uint64_t _size = msg.payload->contentSize();
+    if (_size <= size) {
+        size -= _size;
+    } else {
+        throw Exception(QPID_MSG("Flow limit size underflow on dequeue. Queue=" <<
queueName));
+    }
+
+    if (flowStopped &&
+        (flowResumeSize == 0 || size < flowResumeSize) &&
+        (flowResumeCount == 0 || count < flowResumeCount)) {
+        flowStopped = false;
+        QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the
flow control resume level. Producer flow control deactivated." );
+    }
+
+    if (!flowStopped && !pendingFlow.empty()) {
+        // if msg is flow controlled, release it.
+        std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
+        if (itr != index.end()) {
+            (*itr)->getReceiveCompletion().finishCompleter();
+            index.erase(itr);
+            // stupid:
+            std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(),
+                                                                             pendingFlow.end(),
+                                                                             msg.payload);
+            if (itr2 == pendingFlow.end()) {
+                QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing
in list: " << msg.position);
+            } else {
+                pendingFlow.erase(itr2);
+            }
+        }
+
+        // for now, just release the oldest also
+        if (!pendingFlow.empty()) {
+            pendingFlow.front()->getReceiveCompletion().finishCompleter();
+            itr = index.find(pendingFlow.front());
+            if (itr == index.end()) {
+                QPID_LOG(error, "Queue \"" << queueName << "\": msg missing in
index: " << pendingFlow.front());
+            } else {
+                index.erase(itr);
+            }
+            pendingFlow.pop_front();
+        }
+    }
+}
+
+
+void QueueFlowLimit::encode(Buffer& buffer) const
+{
+  buffer.putLong(flowStopCount);
+  buffer.putLong(flowResumeCount);
+  buffer.putLongLong(flowStopSize);
+  buffer.putLongLong(flowResumeSize);
+  buffer.putLong(count);
+  buffer.putLongLong(size);
+}
+
+
+void QueueFlowLimit::decode ( Buffer& buffer ) 
+{
+  flowStopCount   = buffer.getLong();
+  flowResumeCount = buffer.getLong();
+  flowStopSize    = buffer.getLongLong();
+  flowResumeSize  = buffer.getLongLong();
+  count    = buffer.getLong();
+  size     = buffer.getLongLong();
+}
+
+
+uint32_t QueueFlowLimit::encodedSize() const {
+  return sizeof(uint32_t) +  // flowStopCount
+         sizeof(uint32_t) +  // flowResumecount
+         sizeof(uint64_t) +  // flowStopSize
+         sizeof(uint64_t) +  // flowResumeSize
+         sizeof(uint32_t) +  // count
+         sizeof(uint64_t);   // size
+}
+
+
+const std::string QueueFlowLimit::flowStopCountKey("qpid.flow_stop_count");
+const std::string QueueFlowLimit::flowResumeCountKey("qpid.flow_resume_count");
+const std::string QueueFlowLimit::flowStopSizeKey("qpid.flow_stop_size");
+const std::string QueueFlowLimit::flowResumeSizeKey("qpid.flow_resume_size");
+
+
+std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue, const
qpid::framing::FieldTable& settings)
+{
+    uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
+    uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
+    uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
+    uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
+
+    if (flowStopCount || flowResumeCount || flowStopSize || flowResumeSize) {
+        return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, flowStopCount,
flowResumeCount,
+                                                                flowStopSize, flowResumeSize));
+    } else {
+        return std::auto_ptr<QueueFlowLimit>();
+    }
+}
+
+
+namespace qpid {
+    namespace broker {
+
+std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f)
+{
+    out << "; flowStopCount=" << f.flowStopCount << ", flowResumeCount="
<< f.flowResumeCount;
+    out << "; flowStopSize=" << f.flowStopSize << ", flowResumeSize=" <<
f.flowResumeSize;
+    return out;
+}
+
+    }
+}
+
+/**
+ * TBD:
+ * - Is there a direct way to determine if QM is on pendingFlow list?
+ * - Rate limit the granting of flow.
+ * - What about LVQ?  A newer msg may replace the older one.
+ * - What about queueing during a recovery?
+ * - What about queue purge?
+ * - What about message move?
+ * - How do we treat orphaned messages?
+ * -- Xfer a message to an alternate exchange - do we ack?
+ */

Added: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1065700&view=auto
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (added)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Mon Jan 31 18:09:51
2011
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueueFlowLimit_
+#define _QueueFlowLimit_
+
+#include <list>
+#include <set>
+#include <iostream>
+#include <memory>
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Producer flow control: when level is > flowStop*, flow control is ON.
+ * then level is < flowResume*, flow control is OFF.  If == 0, flow control
+ * is not used.  If both byte and msg count thresholds are set, then
+ * passing _either_ level may turn flow control ON, but _both_ must be
+ * below level before flow control will be turned OFF.
+ */
+class QueueFlowLimit
+{
+    std::string queueName;
+
+    uint32_t flowStopCount;
+    uint32_t flowResumeCount;
+    uint64_t flowStopSize;
+    uint64_t flowResumeSize;
+    bool flowStopped;   // true = producers held in flow control
+
+    // current queue utilization
+    uint32_t count;
+    uint64_t size;
+
+  public:
+    static QPID_BROKER_EXTERN const std::string flowStopCountKey;
+    static QPID_BROKER_EXTERN const std::string flowResumeCountKey;
+    static QPID_BROKER_EXTERN const std::string flowStopSizeKey;
+    static QPID_BROKER_EXTERN const std::string flowResumeSizeKey;
+
+    virtual ~QueueFlowLimit() {}
+
+    /** the queue has added QueuedMessage */
+    void consume(const QueuedMessage&);
+    /** the queue has removed QueuedMessage */
+    void replenish(const QueuedMessage&);
+
+    uint32_t getFlowStopCount() const { return flowStopCount; }
+    uint32_t getFlowResumeCount() const { return flowResumeCount; }
+    uint64_t getFlowStopSize() const { return flowStopSize; }
+    uint64_t getFlowResumeSize() const { return flowResumeSize; }
+    bool isFlowControlActive() const { return flowStopped; }
+    bool monitorFlowControl() const { return flowStopCount || flowStopSize; }
+
+    void encode(framing::Buffer& buffer) const;
+    void decode(framing::Buffer& buffer);
+    uint32_t encodedSize() const;
+
+    static QPID_BROKER_EXTERN std::auto_ptr<QueueFlowLimit> createQueueFlowLimit(Queue
*queue, const qpid::framing::FieldTable& settings);
+    friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const
QueueFlowLimit&);
+
+ protected:
+    // msgs waiting for flow to become available.
+    std::list< boost::intrusive_ptr<Message> > pendingFlow;     // ordered, oldest
@front
+    std::set< boost::intrusive_ptr<Message> > index;
+    qpid::sys::Mutex pendingFlowLock;
+
+    QueueFlowLimit(Queue *queue,
+                   uint32_t flowStopCount, uint32_t flowResumeCount,
+                   uint64_t flowStopSize,  uint64_t flowResumeSize);
+};
+
+}}
+
+
+#endif

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am?rev=1065700&r1=1065699&r2=1065700&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am Mon Jan 31 18:09:51 2011
@@ -98,6 +98,7 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	MessageTest.cpp \
 	QueueRegistryTest.cpp \
 	QueuePolicyTest.cpp \
+	QueueFlowLimitTest.cpp \
 	FramingTest.cpp \
 	HeaderTest.cpp \
 	SequenceNumberTest.cpp \

Added: qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1065700&view=auto
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (added)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Mon Jan 31 18:09:51
2011
@@ -0,0 +1,307 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <sstream>
+#include <deque>
+#include "unit_test.h"
+#include "test_tools.h"
+
+#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "MessageUtils.h"
+#include "BrokerFixture.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(QueueFlowLimitTestSuite)
+
+QueuedMessage createMessage(uint32_t size)
+{
+    QueuedMessage msg;
+    msg.payload = MessageUtils::createMessage();
+    MessageUtils::addContent(msg.payload, std::string (size, 'x'));
+    return msg;
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowCount)
+{
+    FieldTable args;
+    args.setInt(QueueFlowLimit::flowStopCountKey, 7);
+    args.setInt(QueueFlowLimit::flowResumeCountKey, 5);
+
+    std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+    BOOST_CHECK_EQUAL((uint32_t) 7, flow->getFlowStopCount());
+    BOOST_CHECK_EQUAL((uint32_t) 5, flow->getFlowResumeCount());
+    BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopSize());
+    BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeSize());
+    BOOST_CHECK(!flow->isFlowControlActive());
+    BOOST_CHECK(flow->monitorFlowControl());
+
+    std::deque<QueuedMessage> msgs;
+    for (size_t i = 0; i < 6; i++) {
+        msgs.push_back(createMessage(10));
+        flow->consume(msgs.back());
+        BOOST_CHECK(!flow->isFlowControlActive());
+    }
+    BOOST_CHECK(!flow->isFlowControlActive());  // 6 on queue
+    msgs.push_back(createMessage(10));
+    flow->consume(msgs.back());
+    BOOST_CHECK(!flow->isFlowControlActive());  // 7 on queue
+    msgs.push_back(createMessage(10));
+    flow->consume(msgs.back());
+    BOOST_CHECK(flow->isFlowControlActive());   // 8 on queue, ON
+    msgs.push_back(createMessage(10));
+    flow->consume(msgs.back());
+    BOOST_CHECK(flow->isFlowControlActive());   // 9 on queue
+
+    flow->replenish(msgs.front());
+    msgs.pop_front();
+    BOOST_CHECK(flow->isFlowControlActive());   // 8 on queue
+    flow->replenish(msgs.front());
+    msgs.pop_front();
+    BOOST_CHECK(flow->isFlowControlActive());   // 7 on queue
+    flow->replenish(msgs.front());
+    msgs.pop_front();
+    BOOST_CHECK(flow->isFlowControlActive());   // 6 on queue
+    flow->replenish(msgs.front());
+    msgs.pop_front();
+    BOOST_CHECK(flow->isFlowControlActive());   // 5 on queue
+
+    flow->replenish(msgs.front());
+    msgs.pop_front();
+    BOOST_CHECK(!flow->isFlowControlActive());  // 4 on queue, OFF
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowSize)
+{
+    FieldTable args;
+    args.setUInt64(QueueFlowLimit::flowStopSizeKey, 70);
+    args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 50);
+
+    std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+    BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount());
+    BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount());
+    BOOST_CHECK_EQUAL((uint32_t) 70, flow->getFlowStopSize());
+    BOOST_CHECK_EQUAL((uint32_t) 50, flow->getFlowResumeSize());
+    BOOST_CHECK(!flow->isFlowControlActive());
+    BOOST_CHECK(flow->monitorFlowControl());
+
+    std::deque<QueuedMessage> msgs;
+    for (size_t i = 0; i < 6; i++) {
+        msgs.push_back(createMessage(10));
+        flow->consume(msgs.back());
+        BOOST_CHECK(!flow->isFlowControlActive());
+    }
+    BOOST_CHECK(!flow->isFlowControlActive());  // 60 on queue
+    QueuedMessage msg_9 = createMessage(9);
+    flow->consume(msg_9);
+    BOOST_CHECK(!flow->isFlowControlActive());  // 69 on queue
+    QueuedMessage tinyMsg_1 = createMessage(1);
+    flow->consume(tinyMsg_1);
+    BOOST_CHECK(!flow->isFlowControlActive());   // 70 on queue
+
+    QueuedMessage tinyMsg_2 = createMessage(1);
+    flow->consume(tinyMsg_2);
+    BOOST_CHECK(flow->isFlowControlActive());   // 71 on queue, ON
+    msgs.push_back(createMessage(10));
+    flow->consume(msgs.back());
+    BOOST_CHECK(flow->isFlowControlActive());   // 81 on queue
+
+    flow->replenish(msgs.front());
+    msgs.pop_front();
+    BOOST_CHECK(flow->isFlowControlActive());   // 71 on queue
+    flow->replenish(msgs.front());
+    msgs.pop_front();
+    BOOST_CHECK(flow->isFlowControlActive());   // 61 on queue
+    flow->replenish(msgs.front());
+    msgs.pop_front();
+    BOOST_CHECK(flow->isFlowControlActive());   // 51 on queue
+
+    flow->replenish(tinyMsg_1);
+    BOOST_CHECK(flow->isFlowControlActive());   // 50 on queue
+    flow->replenish(tinyMsg_2);
+    BOOST_CHECK(!flow->isFlowControlActive());   // 49 on queue, OFF
+
+    flow->replenish(msg_9);
+    BOOST_CHECK(!flow->isFlowControlActive());  // 40 on queue
+    flow->replenish(msgs.front());
+    msgs.pop_front();
+    BOOST_CHECK(!flow->isFlowControlActive());  // 30 on queue
+    flow->replenish(msgs.front());
+    msgs.pop_front();
+    BOOST_CHECK(!flow->isFlowControlActive());  // 20 on queue
+}
+
+QPID_AUTO_TEST_CASE(testFlowArgs)
+{
+    FieldTable args;
+    const uint64_t stop(0x2FFFFFFFF);
+    const uint64_t resume(0x1FFFFFFFF);
+    args.setInt(QueueFlowLimit::flowStopCountKey, 30);
+    args.setInt(QueueFlowLimit::flowResumeCountKey, 21);
+    args.setUInt64(QueueFlowLimit::flowStopSizeKey, stop);
+    args.setUInt64(QueueFlowLimit::flowResumeSizeKey, resume);
+
+    std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+    BOOST_CHECK_EQUAL((uint32_t) 30, flow->getFlowStopCount());
+    BOOST_CHECK_EQUAL((uint32_t) 21, flow->getFlowResumeCount());
+    BOOST_CHECK_EQUAL(stop, flow->getFlowStopSize());
+    BOOST_CHECK_EQUAL(resume, flow->getFlowResumeSize());
+    BOOST_CHECK(!flow->isFlowControlActive());
+    BOOST_CHECK(flow->monitorFlowControl());
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowCombo)
+{
+    FieldTable args;
+    args.setInt(QueueFlowLimit::flowStopCountKey, 10);
+    args.setInt(QueueFlowLimit::flowResumeCountKey, 5);
+    args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200);
+    args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100);
+
+    std::deque<QueuedMessage> msgs_1;
+    std::deque<QueuedMessage> msgs_10;
+    std::deque<QueuedMessage> msgs_50;
+    std::deque<QueuedMessage> msgs_100;
+
+    QueuedMessage msg;
+
+    std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+    BOOST_CHECK(!flow->isFlowControlActive());        // count:0  size:0
+
+    // verify flow control comes ON when only count passes its stop point.
+
+    for (size_t i = 0; i < 10; i++) {
+        msgs_10.push_back(createMessage(10));
+        flow->consume(msgs_10.back());
+        BOOST_CHECK(!flow->isFlowControlActive());
+    }
+    // count:10 size:100
+
+    msgs_1.push_back(createMessage(1));
+    flow->consume(msgs_1.back());  // count:11 size: 101  ->ON
+    BOOST_CHECK(flow->isFlowControlActive());
+
+    for (size_t i = 0; i < 6; i++) {
+        flow->replenish(msgs_10.front());
+        msgs_10.pop_front();
+        BOOST_CHECK(flow->isFlowControlActive());
+    }
+    // count:5 size: 41
+
+    flow->replenish(msgs_1.front());        // count: 4 size: 40  ->OFF
+    msgs_1.pop_front();
+    BOOST_CHECK(!flow->isFlowControlActive());
+
+    for (size_t i = 0; i < 4; i++) {
+        flow->replenish(msgs_10.front());
+        msgs_10.pop_front();
+        BOOST_CHECK(!flow->isFlowControlActive());
+    }
+    // count:0 size:0
+
+    // verify flow control comes ON when only size passes its stop point.
+
+    msgs_100.push_back(createMessage(100));
+    flow->consume(msgs_100.back());  // count:1 size: 100
+    BOOST_CHECK(!flow->isFlowControlActive());
+
+    msgs_50.push_back(createMessage(50));
+    flow->consume(msgs_50.back());   // count:2 size: 150
+    BOOST_CHECK(!flow->isFlowControlActive());
+
+    msgs_50.push_back(createMessage(50));
+    flow->consume(msgs_50.back());   // count:3 size: 200
+    BOOST_CHECK(!flow->isFlowControlActive());
+
+    msgs_1.push_back(createMessage(1));
+    flow->consume(msgs_1.back());   // count:4 size: 201  ->ON
+    BOOST_CHECK(flow->isFlowControlActive());
+
+    flow->replenish(msgs_100.front());              // count:3 size:101
+    msgs_100.pop_front();
+    BOOST_CHECK(flow->isFlowControlActive());
+
+    flow->replenish(msgs_1.front());                // count:2 size:100
+    msgs_1.pop_front();
+    BOOST_CHECK(flow->isFlowControlActive());
+
+    flow->replenish(msgs_50.front());               // count:1 size:50  ->OFF
+    msgs_50.pop_front();
+    BOOST_CHECK(!flow->isFlowControlActive());
+
+    // verify flow control remains ON until both thresholds drop below their
+    // resume point.
+
+    for (size_t i = 0; i < 8; i++) {
+        msgs_10.push_back(createMessage(10));
+        flow->consume(msgs_10.back());
+        BOOST_CHECK(!flow->isFlowControlActive());
+    }
+    // count:9 size:130
+
+    msgs_10.push_back(createMessage(10));
+    flow->consume(msgs_10.back());              // count:10 size: 140
+    BOOST_CHECK(!flow->isFlowControlActive());
+
+    msgs_1.push_back(createMessage(1));
+    flow->consume(msgs_1.back());               // count:11 size: 141  ->ON
+    BOOST_CHECK(flow->isFlowControlActive());
+
+    msgs_100.push_back(createMessage(100));
+    flow->consume(msgs_100.back());     // count:12 size: 241  (both thresholds crossed)
+    BOOST_CHECK(flow->isFlowControlActive());
+
+    // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241
+
+    flow->replenish(msgs_50.front());               // count:11 size:191
+    msgs_50.pop_front();
+    BOOST_CHECK(flow->isFlowControlActive());
+
+    for (size_t i = 0; i < 9; i++) {
+        flow->replenish(msgs_10.front());
+        msgs_10.pop_front();
+        BOOST_CHECK(flow->isFlowControlActive());
+    }
+    // count:2 size:101
+    flow->replenish(msgs_1.front());                // count:1 size:100
+    msgs_1.pop_front();
+    BOOST_CHECK(flow->isFlowControlActive());   // still active due to size
+
+    flow->replenish(msgs_100.front());               // count:0 size:0  ->OFF
+    msgs_100.pop_front();
+    BOOST_CHECK(!flow->isFlowControlActive());
+}
+
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message