qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1536751 - in /qpid/trunk/qpid/cpp/src/tests: ha_test.py ha_tests.py
Date Tue, 29 Oct 2013 15:23:26 GMT
Author: aconway
Date: Tue Oct 29 15:23:25 2013
New Revision: 1536751

URL: http://svn.apache.org/r1536751
Log:
QPID-5139: Add unit test for deadlock caused by blocking HA commit.

Modified:
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1536751&r1=1536750&r2=1536751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Tue Oct 29 15:23:25 2013
@@ -128,6 +128,7 @@ class HaBroker(Broker):
         ha_port = ha_port or HaPort(test)
         args = copy(args)
         args += ["--load-module", BrokerTest.ha_lib,
+                 "--log-enable=info+",
                  "--log-enable=debug+:ha::",
                  # Non-standard settings for faster tests.
                  "--link-maintenance-interval=0.1",

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1536751&r1=1536750&r2=1536751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Oct 29 15:23:25 2013
@@ -1304,8 +1304,6 @@ def open_read(name):
 
 class TransactionTests(HaBrokerTest):
 
-    load_store=["--load-module", BrokerTest.test_store_lib]
-
     def tx_simple_setup(self, broker):
         """Start a transaction, remove messages from queue a, add messages to queue b"""
         c = broker.connect()
@@ -1437,6 +1435,9 @@ class TransactionTests(HaBrokerTest):
         tx.sync()
         self.assert_simple_rollback_outcome(cluster[0], tx_queues)
 
+    def assert_commit_raises(self, tx):
+        def commit_sync(): tx.commit(timeout=1); tx.sync(timeout=1)
+        self.assertRaises(ServerError, commit_sync)
 
     def test_tx_backup_fail(self):
         cluster = HaCluster(
@@ -1445,7 +1446,7 @@ class TransactionTests(HaBrokerTest):
         tx = c.session(transactional=True)
         s = tx.sender("q;{create:always,node:{durable:true}}")
         for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
-        self.assertRaises(ServerError, tx.commit)
+        self.assert_commit_raises(tx)
         for b in cluster: b.assert_browse_backup("q", [])
         self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort
tx=1>\n")
         self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue
q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
@@ -1463,7 +1464,7 @@ class TransactionTests(HaBrokerTest):
         self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
         cluster[1].kill(final=False)
         s.send("b")
-        self.assertRaises(ServerError, tx.commit)
+        self.assert_commit_raises(tx)
         self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
 
         # Joining
@@ -1475,6 +1476,24 @@ class TransactionTests(HaBrokerTest):
         # The new member is not in the tx but  receives the results normal replication.
         for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
 
+    def test_tx_block_threads(self):
+        """Verify that TXs blocked in commit don't deadlock."""
+        cluster = HaCluster(self, 2, args=["--worker-threads=2"], test_store=True)
+        n = 10                  # Number of concurrent transactions
+        sessions = [cluster[0].connect().session(transactional=True) for i in xrange(n)]
+        # Have the store delay the response for 10s
+        for s in sessions:
+            sn = s.sender("qq;{create:always,node:{durable:true}}")
+            sn.send(Message("foo", durable=True))
+        self.assertEqual(n, len(cluster[1].agent().tx_queues()))
+        threads = [ Thread(target=s.commit) for s in sessions]
+        for t in threads: t.start()
+        cluster[0].ready(timeout=1) # Check for deadlock
+        for b in cluster: b.assert_browse_backup('qq', ['foo']*n)
+        for t in threads: t.join()
+        for s in sessions: s.connection.close()
+
+
 if __name__ == "__main__":
     outdir = "ha_tests.tmp"
     shutil.rmtree(outdir, True)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message