qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1171175 - /qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
Date Thu, 15 Sep 2011 17:03:11 GMT
Author: gsim
Date: Thu Sep 15 17:03:11 2011
New Revision: 1171175

URL: http://svn.apache.org/viewvc?rev=1171175&view=rev
Log:
QPID-3488: Added test case

Modified:
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py?rev=1171175&r1=1171174&r2=1171175&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py Thu Sep 15 17:03:11 2011
@@ -508,6 +508,47 @@ class MessageTests(TestBase010):
 
         msgB = q.get(timeout=10)
 
+    def test_window_stop(self):
+        """
+        Ensure window based flow control reacts to stop correctly
+        """
+        session = self.session
+        #setup subscriber on a test queue
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        session.message_subscribe(queue = "q", destination = "c")
+        session.message_set_flow_mode(flow_mode = 1, destination = "c")
+        session.message_flow(unit = session.credit_unit.message, value = 5, destination =
"c")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination
= "c")
+
+
+        #send batch of messages to queue
+        for i in range(0, 10):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"Message %d" % (i+1)))
+
+        #retrieve all delivered messages
+        q = session.incoming("c")
+        for i in range(0, 5):
+            msg = q.get(timeout = 1)
+            session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+            self.assertDataEquals(session, msg, "Message %d" % (i+1))
+
+        session.message_stop(destination = "c")
+
+        #now send completions, normally used to move window forward,
+        #but after a stop should not do so
+        session.channel.session_completed(session.receiver._completed)
+
+        #check no more messages are sent
+        self.assertEmpty(q)
+
+        #re-establish window and check remaining messages
+        session.message_flow(unit = session.credit_unit.message, value = 5, destination =
"c")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination
= "c")
+        for i in range(0, 5):
+            msg = q.get(timeout = 1)
+            self.assertDataEquals(session, msg, "Message %d" % (i+6))
+
+
     def test_subscribe_not_acquired(self):
         """
         Test the not-acquired modes works as expected for a simple case



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message