qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1099957 - in /qpid/trunk/qpid/cpp/src/tests: queue_flow_limit_tests.py run_queue_flow_limit_tests
Date Thu, 05 May 2011 20:26:09 GMT
Author: kgiusti
Date: Thu May  5 20:26:09 2011
New Revision: 1099957

URL: http://svn.apache.org/viewvc?rev=1099957&view=rev
Log:
QPID-3243: unit test to verify fix.

Modified:
    qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py
    qpid/trunk/qpid/cpp/src/tests/run_queue_flow_limit_tests

Modified: qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py?rev=1099957&r1=1099956&r2=1099957&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py Thu May  5 20:26:09 2011
@@ -37,7 +37,8 @@ class QueueFlowLimitTests(TestBase010):
 
     def _create_queue(self, name,
                      stop_count=None, resume_count=None,
-                     stop_size=None, resume_size=None):
+                     stop_size=None, resume_size=None,
+                     max_size=None, max_count=None):
         """ Create a queue with the given flow settings via the queue.declare
         command.
         """
@@ -50,6 +51,11 @@ class QueueFlowLimitTests(TestBase010):
             args["qpid.flow_stop_size"] = stop_size;
         if (resume_size is not None):
             args["qpid.flow_resume_size"] = resume_size;
+        if (max_size is not None):
+            args["qpid.max_size"] = max_size;
+        if (max_count is not None):
+            args["qpid.max_count"] = max_count;
+
 
         self.session.queue_declare(queue=name, arguments=args)
 
@@ -65,6 +71,10 @@ class QueueFlowLimitTests(TestBase010):
                     self.assertEqual(i.arguments.get("qpid.flow_stop_size"), stop_size)
                 if (resume_size is not None):
                     self.assertEqual(i.arguments.get("qpid.flow_resume_size"), resume_size)
+                if (max_size is not None):
+                    self.assertEqual(i.arguments.get("qpid.max_size"), max_size)
+                if (max_count is not None):
+                    self.assertEqual(i.arguments.get("qpid.max_count"), max_count)
                 self.failIf(i.flowStopped)
                 return i.getObjectId()
         self.fail("Unable to create queue '%s'" % name)
@@ -77,7 +87,7 @@ class QueueFlowLimitTests(TestBase010):
         self.session.queue_delete(queue=name)
 
 
-    def _start_qpid_send(self, queue, count, content="X", capacity=10):
+    def _start_qpid_send(self, queue, count, content="X", capacity=100):
         """ Use the qpid-send client to generate traffic to a queue.
         """
         command = "qpid-send" + \
@@ -129,27 +139,6 @@ class QueueFlowLimitTests(TestBase010):
             self.assertEqual(i.name, "test01")
             self._delete_queue("test01")
 
-            # now verify that the default ratios are applied if max sizing is specified:
-            command = tool + \
-                " --broker-addr=%s:%s " % (self.broker.host, self.broker.port) \
-                + "add queue test02 --max-queue-count=10000 --max-queue-size=1000000"
-            cmd = popen(command)
-            rc = cmd.close()
-            self.assertEqual(rc, None)
-
-            # now verify the settings
-            qs = self.qmf.getObjects(_class="queue")
-            for i in qs:
-                if i.name == "test02":
-                    ## @todo KAG: can't get the flow size from qmf!  Arrgh!
-                    # no way to verify...
-                    #self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55)
-                    #self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55)
-                    self.failIf(i.flowStopped)
-                    break;
-            self.assertEqual(i.name, "test02")
-            self._delete_queue("test02")
-
 
     def test_flow_count(self):
         """ Create a queue with count-based flow limit.  Spawn several
@@ -167,7 +156,7 @@ class QueueFlowLimitTests(TestBase010):
 
         # wait until flow control is active
         deadline = time() + 10
-        while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
+        while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
                 time() < deadline:
             pass
         self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
@@ -217,11 +206,10 @@ class QueueFlowLimitTests(TestBase010):
         sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13);
         sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149);
         totalMsgs = 1699 + 1129 + 881
-        totalBytes = 439 + 631 + 823
 
         # wait until flow control is active
         deadline = time() + 10
-        while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
+        while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
                 time() < deadline:
             pass
         self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
@@ -258,5 +246,84 @@ class QueueFlowLimitTests(TestBase010):
         self._delete_queue("test-q")
 
 
+    def verify_limit(self, testq):
+        """ run a limit check against the testq object
+        """
+
+        testq.mgmt = self.qmf.getObjects(_objectId=testq.oid)[0]
+
+        # fill up the queue, waiting until flow control is active
+        sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content)
+        deadline = time() + 10
+        while (not testq.mgmt.flowStopped) and time() < deadline:
+            testq.mgmt.update()
+
+        self.failUnless(testq.verifyStopped())
+
+        # now consume enough messages to drop below the flow resume point, and
+        # verify flow control is released.
+        rcvr = self._start_qpid_receive(testq.mgmt.name, count=testq.consumeCount)
+        rcvr.readlines()    # prints a line for each received msg
+        rcvr.close();
+
+        # we should now be below the resume threshold
+        self.failUnless(testq.verifyResumed())
+
+        self._delete_queue(testq.mgmt.name)
+        sndr1.close();
+
+
+    def test_default_flow_count(self):
+        """ Create a queue with count-based size limit, and verify the computed
+        thresholds using the broker's default ratios.
+        """
+        class TestQ:
+            def __init__(self, oid):
+                # Use the broker-wide default flow thresholds of 80%/70% (see
+                # run_queue_flow_limit_tests) to base the thresholds off the
+                # queue's max_count configuration parameter
+                # max_count == 1000 -> stop == 800, resume == 700
+                self.oid = oid
+                self.sendCount = 1000
+                self.consumeCount = 301 # (send - resume) + 1 to reenable flow
+                self.content = "X"
+            def verifyStopped(self):
+                self.mgmt.update()
+                return self.mgmt.flowStopped and (self.mgmt.msgDepth > 800)
+            def verifyResumed(self):
+                self.mgmt.update()
+                return (not self.mgmt.flowStopped) and (self.mgmt.msgDepth < 700)
+
+        self.startQmf();
+        oid = self._create_queue("test-X", max_count=1000)
+        self.verify_limit(TestQ(oid))
+
+
+    def test_default_flow_size(self):
+        """ Create a queue with byte-based size limit, and verify the computed
+        thresholds using the broker's default ratios.
+        """
+        class TestQ:
+            def __init__(self, oid):
+                # Use the broker-wide default flow thresholds of 80%/70% (see
+                # run_queue_flow_limit_tests) to base the thresholds off the
+                # queue's max_count configuration parameter
+                # max_size == 10000 -> stop == 8000 bytes, resume == 7000 bytes
+                self.oid = oid
+                self.sendCount = 2000
+                self.consumeCount = 601 # (send - resume) + 1 to reenable flow
+                self.content = "XXXXX"  # 5 bytes per message sent.
+            def verifyStopped(self):
+                self.mgmt.update()
+                return self.mgmt.flowStopped and (self.mgmt.byteDepth > 8000)
+            def verifyResumed(self):
+                self.mgmt.update()
+                return (not self.mgmt.flowStopped) and (self.mgmt.byteDepth < 7000)
+
+        self.startQmf();
+        oid = self._create_queue("test-Y", max_size=10000)
+        self.verify_limit(TestQ(oid))
+
+
 
 

Modified: qpid/trunk/qpid/cpp/src/tests/run_queue_flow_limit_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_queue_flow_limit_tests?rev=1099957&r1=1099956&r2=1099957&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_queue_flow_limit_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_queue_flow_limit_tests Thu May  5 20:26:09 2011
@@ -35,8 +35,10 @@ error() {
 }
 
 start_broker() {
+    # Note: if you change the DEFAULT_THRESHOLDS, you will need to update queue_flow_limit_tests.py
+    DEFAULT_THRESHOLDS="--default-flow-stop-threshold=80 --default-flow-resume-threshold=70"
     rm -rf $LOG_FILE
-    PORT=$($QPIDD_EXEC --auth=no --no-module-dir --daemon --port=0 -t --log-to-file $LOG_FILE)
|| error "Could not start broker"
+    PORT=$($QPIDD_EXEC $DEFAULT_THRESHOLDS --auth=no --no-module-dir --daemon --port=0 -t
--log-to-file $LOG_FILE) || error "Could not start broker"
 }
 
 stop_broker() {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message