qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r834662 - in /qpid/trunk/qpid/cpp/src/tests: brokertest.py cluster_tests.fail cluster_tests.py run_cluster_tests
Date Tue, 10 Nov 2009 20:57:30 GMT
Author: aconway
Date: Tue Nov 10 20:57:30 2009
New Revision: 834662

URL: http://svn.apache.org/viewvc?rev=834662&view=rev
Log:
Fixed test_failover, added ErrorGenerator with test.

Modified:
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/src/tests/run_cluster_tests

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=834662&r1=834661&r2=834662&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Tue Nov 10 20:57:30 2009
@@ -21,7 +21,7 @@
 # or federation
 
 import os, signal, string, tempfile, popen2, socket, threading, time
-import qpid
+import qpid, traceback
 from qpid import connection, messaging, util
 from qpid.compat import format_exc
 from qpid.harness import Skipped
@@ -41,13 +41,14 @@
     except:
         return False
 
-class Unexpected(Exception):
+class BadProcessStatus(Exception):
     pass
 
 class Popen(popen2.Popen3):
     """
     Similar to subprocess.Popen but using popen2 classes for portability.
     Can set and verify expectation of process status at end of test.
+    Dumps command line, stdout, stderr to data dir for debugging.
     """
 
     def __init__(self, cmd, expect=EXPECT_EXIT_OK):
@@ -57,17 +58,25 @@
         self.stdin = self.tochild
         self.stdout = self.fromchild
         self.stderr = self.childerr
+        self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid)
+        self.dump(self.cmd_str(), "cmd")
+
+    def dump(self, str, ext):
+        f = file("%s.%s" % (self.pname, ext), "w")
+        f.write(str)
+        f.close()
 
     def unexpected(self,msg):
-        raise Unexpected("%s: %s\n--stdout:\n%s\n--stderr:\n%s" %
-                         (msg, self.cmd_str(), self.stdout.read(), self.stderr.read()))
+        self.dump(self.stdout.read(), "out")
+        self.dump(self.stderr.read(), "err")
+        raise BadProcessStatus("%s: %s" % (msg, self.pname))
     
-    def testend(self):                  # Clean up at end of test.
+    def stop(self):                  # Clean up at end of test.
         if self.expect == EXPECT_RUNNING:
             try:
                 self.kill()
             except:
-                self.unexpected("Expected running but exited %d" % self.wait())
+                self.unexpected("Exit code %d" % self.wait())
         else:
             # Give the process some time to exit.
             delay = 0.1
@@ -76,11 +85,11 @@
                 delay *= 2
             if self.returncode is None: # Still haven't stopped
                 self.kill()
-                self.unexpected("Expected to exit but still running")
+                self.unexpected("Still running")
             elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
-                self.unexpected("Expected exit ok but exited %d" % self.returncode)
+                self.unexpected("Exit code %d" % self.returncode)
             elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
-                self.unexpected("Expected to fail but exited ok")
+                self.unexpected("Expected error")
                
     def communicate(self, input=None):
         if input:
@@ -107,8 +116,6 @@
 
     def terminate(self): self.send_signal(signal.SIGTERM)
     def kill(self): self.send_signal(signal.SIGKILL)
-        
-        
 
     def cmd_str(self): return " ".join([str(s) for s in self.cmd])
 
@@ -133,19 +140,23 @@
         else:
             self.name = "broker%d" % Broker._broker_count
             Broker._broker_count += 1
-        self.log = os.path.join(test.dir, self.name+".log")
-        cmd += ["--log-to-file", self.log, "--log-prefix", self.name,"--log-to-stderr=no"]
-        self.datadir = os.path.join(test.dir, self.name)
+        self.log = self.name+".log"
+        cmd += ["--log-to-file", self.log, "--log-prefix", self.name]
+        cmd += ["--log-to-stderr=no"] 
+        self.datadir = self.name
         cmd += ["--data-dir", self.datadir]
         if self._store_lib: cmd += ["--load-module", self._store_lib]
 
         Popen.__init__(self, cmd, expect)
         try: self.port = int(self.stdout.readline())
-        except Exception:
-            raise Exception("Failed to start broker, log: "+self.log)
-        test.cleanup_popen(self)
+        except Exception, e:
+            raise Exception("Failed to start broker %s (%s)" % (self.name, self.pname))
+        test.cleanup_stop(self)
         self.host = "localhost"         # Placeholder for remote brokers.
 
+    def unexpected(self,msg):
+        raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname))
+
     def connect(self):
         """New API connection to the broker."""
         return messaging.Connection.open(self.host, self.port)
@@ -168,6 +179,12 @@
         s.sender(queue+" {create:always}").send(message)
         s.connection.close()
 
+    def send_messages(self, queue, messages):
+        s = self.connect().session()
+        sender = s.sender(queue+" {create:always}")
+        for m in messages: sender.send(m)
+        s.connection.close()
+
     def get_message(self, queue):
         s = self.connect().session()
         m = s.receiver(queue+" {create:always}", capacity=1).fetch(timeout=1)
@@ -175,6 +192,14 @@
         s.connection.close()
         return m
 
+    def get_messages(self, queue, n):
+        s = self.connect().session()
+        receiver = s.receiver(queue+" {create:always}", capacity=n)
+        m = [receiver.fetch(timeout=1) for i in range(n)]
+        s.acknowledge()
+        s.connection.close()
+        return m
+        
 class Cluster:
     """A cluster of brokers in a test."""
 
@@ -217,9 +242,7 @@
     Provides a well-known working directory for each test.
     """
 
-    # FIXME aconway 2009-11-05: too many env vars, need a simpler
-    # scheme for locating exes and libs
-
+    # Environment settings.
     cluster_lib = os.getenv("CLUSTER_LIB")
     xml_lib = os.getenv("XML_LIB")
     qpidConfig_exec = os.getenv("QPID_CONFIG_EXEC")
@@ -227,36 +250,39 @@
     receiver_exec = os.getenv("RECEIVER_EXEC")
     sender_exec = os.getenv("SENDER_EXEC")
     store_lib = os.getenv("STORE_LIB")
-
+    
+    rootdir = os.getcwd()
     def configure(self, config): self.config=config
     
     def setUp(self):
-        self.dir = os.path.join(self.config.defines["OUTDIR"], self.id())
+        self.dir = os.path.join(self.rootdir, self.config.defines["OUTDIR"], self.id())
         os.makedirs(self.dir)
-        self.popens = []
+        os.chdir(self.dir)
+        self.stopem = []                # things to stop at end of test
 
     def tearDown(self):
         err = []
-        for p in self.popens:
-            try: p.testend()
-            except Unexpected, e: err.append(str(e))
-        if err: raise Exception("\n".join(err))
+        for p in self.stopem:
+            try: p.stop()
+            except Exception, e: err.append(str(e))
+        if err: raise Exception("\n    ".join(err))
 
     # FIXME aconway 2009-11-06: check for core files of exited processes.
     
-    def cleanup_popen(self, popen):
-        """Add process to be killed at end of test"""
-        self.popens.append(popen)
+    def cleanup_stop(self, stopable):
+        """Call thing.stop at end of test"""
+        self.stopem.append(stopable)
 
     def popen(self, cmd, expect=EXPECT_EXIT_OK):
-        """Start a process that will be killed at end of test"""
+        """Start a process that will be killed at end of test, in the test dir."""
+        os.chdir(self.dir)
         p = Popen(cmd, expect)
-        self.cleanup_popen(p)
+        self.cleanup_stop(p)
         return p
 
     def broker(self, args=[], name=None, expect=EXPECT_RUNNING):
         """Create and return a broker ready for use"""
-        b = Broker(self, args, name, expect)
+        b = Broker(self, args=args, name=name, expect=expect)
         b.connect().close()
         return b
 
@@ -266,6 +292,11 @@
         cluster.wait()
         return cluster
 
+class RethrownException(Exception):
+    """Captures the original stack trace to be thrown later""" 
+    def __init__(self, e):
+        Exception.__init__(self, format_exc())
+
 class StoppableThread(Thread):
     """
     Base class for threads that do something in a loop and periodically check
@@ -281,7 +312,7 @@
         self.join()
         if self.error: raise self.error
     
-class Sender(StoppableThread):
+class NumberedSender(StoppableThread):
     """
     Thread to run a sender client and send numbered messages until stopped.
     """
@@ -300,7 +331,7 @@
                 self.sent += 1
         except Exception, e: self.error = RethrownException(e)
 
-class Receiver(Thread):
+class NumberedReceiver(Thread):
     """
     Thread to run a receiver client and verify it receives
     sequentially numbered messages.
@@ -320,8 +351,9 @@
             while self.stopat is None or self.received < self.stopat:
                 self.lock.acquire()
                 try:
-                    self.test.assertEqual(self.receiver.stdout.readline(), str(self.received)+"\n")
-                    self.received += 1
+                    m = int(self.receiver.stdout.readline())
+                    assert(m <= self.received) # Allow for duplicates
+                    if (m == self.received): self.received += 1
                 finally:
                     self.lock.release()
         except Exception, e:
@@ -335,7 +367,27 @@
         self.join()
         if self.error: raise self.error
 
-class RethrownException(Exception):
-    """Captures the original stack trace to be thrown later""" 
-    def __init__(self, e):
-        Exception.__init__(self, format_exc())
+class ErrorGenerator(StoppableThread):
+    """
+    Thread that continuously generates errors by trying to consume from
+    a non-existent queue. For cluster regression tests, error handling
+    caused issues in the past.
+    """
+
+    def __init__(self, broker):
+        StoppableThread.__init__(self)
+        self.broker=broker
+        broker.test.cleanup_stop(self)
+        self.start()
+        
+    def run(self):
+        c = self.broker.connect_old()
+        try:
+            while not self.stopped:
+                try:
+                    c.session(str(qpid.datatypes.uuid4())).message_subscribe(
+                        queue="non-existent-queue")
+                    assert(False)
+                except qpid.session.SessionException: pass
+        except: pass                    # Normal if broker is killed.
+

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail?rev=834662&r1=834661&r2=834662&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail Tue Nov 10 20:57:30 2009
@@ -1,2 +1,2 @@
-cluster_tests.ClusterTests.test_failover
+
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=834662&r1=834661&r2=834662&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Nov 10 20:57:30 2009
@@ -53,28 +53,42 @@
         s2.connection.close()
 
     def test_failover(self):
-        """Test fail-over during continuous send-receive"""
-        # FIXME aconway 2009-11-09: this test is failing, showing lost messages.
-        # Enable when fixed
+        """Test fail-over during continuous send-receive with errors"""
 
         # Original cluster will all be killed so expect exit with failure
         cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
 
+
         # Start sender and receiver threads
         cluster[0].declare_queue("test-queue")
-        self.receiver = Receiver(cluster[1])
-        self.receiver.start()
-        self.sender = Sender(cluster[2])
-        self.sender.start()
+        receiver = NumberedReceiver(cluster[1])
+        receiver.start()
+        sender = NumberedSender(cluster[2])
+        sender.start()
 
         # Kill original brokers, start new ones.
         for i in range(3):
             cluster[i].kill()
-            cluster.start()
+            b = cluster.start()
             time.sleep(1)
 
-        self.sender.stop()
-        self.receiver.stop(self.sender.sent)
+        sender.stop()
+        receiver.stop(sender.sent)
+
+    def send_receive_verify(self, b1, b2, queue, msgs):
+        b1.send_messages(queue, msgs)
+        self.assertEqual(msgs, [ m.content for m in b2.get_messages(queue,len(msgs))])
+        
+    def test_error_storm(self):
+        """Verify cluster behaves with clients generating a lot of errors."""
+        cluster = self.cluster(3)
+        errgen = [ ErrorGenerator(b) for b in cluster ]
+        msgs = [ str(i) for i in range(10) ]
+        self.send_receive_verify(cluster[0], cluster[1], "q", msgs)
+        self.send_receive_verify(cluster[1], cluster[2], "q", msgs)
+        for i in range(3):
+            cluster.start()
+            self.send_receive_verify(cluster[1], cluster[2], "q", msgs)
 
 
 class ClusterStoreTests(BrokerTest):

Modified: qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_cluster_tests?rev=834662&r1=834661&r2=834662&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_cluster_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_cluster_tests Tue Nov 10 20:57:30 2009
@@ -45,7 +45,7 @@
 mkdir -p $OUTDIR
 
 # Ignore tests requiring a store by default. 
-TESTS="-i cluster_tests.ClusterStoreTests.* -I $srcdir/cluster_tests.fail"
+TESTS="-i cluster_tests.ClusterStoreTests.* -I $srcdir/cluster_tests.fail $*"
 
 with_ais_group $PYTHON_COMMANDS/qpid-python-test -DOUTDIR=$OUTDIR -m cluster_tests $TESTS
|| exit 1
 rm -rf $OUTDIR



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message