qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r883909 - in /qpid/trunk/qpid: cpp/src/tests/cluster_tests.py python/qpid/brokertest.py
Date Tue, 24 Nov 2009 22:40:53 GMT
Author: aconway
Date: Tue Nov 24 22:40:53 2009
New Revision: 883909

URL: http://svn.apache.org/viewvc?rev=883909&view=rev
Log:
Added flow control to failover_test in cluster_tests.py.

Modified:
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=883909&r1=883908&r2=883909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Nov 24 22:40:53 2009
@@ -73,9 +73,9 @@
 
         # Start sender and receiver threads
         cluster[0].declare_queue("test-queue")
-        receiver = NumberedReceiver(cluster[1])
+        sender = NumberedSender(cluster[1], 1000) # Max queue depth
+        receiver = NumberedReceiver(cluster[2], sender)
         receiver.start()
-        sender = NumberedSender(cluster[2])
         sender.start()
 
         # Kill original brokers, start new ones for the duration.

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=883909&r1=883908&r2=883909&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Tue Nov 24 22:40:53 2009
@@ -324,26 +324,48 @@
     Thread to run a sender client and send numbered messages until stopped.
     """
 
-    def __init__(self, broker):
+    def __init__(self, broker, max_depth=None):
+        """
+        max_depth: enable flow control, ensure sent - received <= max_depth.
+        Requires self.received(n) to be called each time messages are received.
+        """
         StoppableThread.__init__(self)
         self.sender = broker.test.popen(
             [broker.test.sender_exec, "--port", broker.port()], expect=EXPECT_RUNNING)
+        self.condition = Condition()
+        self.max = max_depth
+        self.received = 0
 
     def run(self):
         try:
             self.sent = 0
             while not self.stopped:
+                if self.max:
+                    self.condition.acquire()
+                    while self.sent - self.received > self.max:
+                        self.condition.wait()
+                    self.condition.release()
                 self.sender.stdin.write(str(self.sent)+"\n")
                 self.sender.stdin.flush()
                 self.sent += 1
         except Exception, e: self.error = RethrownException(e, self.sender.pname)
 
+    def notify_received(self, count):
+        """Called by receiver to enable flow control. count = messages received so far."""
+        self.condition.acquire()
+        self.received = count
+        self.condition.notify()
+        self.condition.release()
+        
 class NumberedReceiver(Thread):
     """
     Thread to run a receiver client and verify it receives
     sequentially numbered messages.
     """
-    def __init__(self, broker):
+    def __init__(self, broker, sender = None):
+        """
+        sender: enable flow control. Call sender.received(n) for each message received.
+        """
         Thread.__init__(self)
         self.test = broker.test
         self.receiver = self.test.popen(
@@ -351,7 +373,8 @@
         self.stopat = None
         self.lock = Lock()
         self.error = None
-
+        self.sender = sender
+        
     def run(self):
         try:
             self.received = 0
@@ -360,7 +383,10 @@
                 try:
                     m = int(self.receiver.stdout.readline())
                     assert(m <= self.received) # Allow for duplicates
-                    if (m == self.received): self.received += 1
+                    if (m == self.received):
+                        self.received += 1
+                        if self.sender:
+                            self.sender.notify_received(self.received)
                 finally:
                     self.lock.release()
         except Exception, e:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message