qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1482881 - in /qpid/trunk/qpid/cpp/src: qpid/ha/BrokerReplicator.cpp qpid/ha/StatusCheck.cpp tests/brokertest.py tests/ha_test.py tests/ha_tests.py
Date Wed, 15 May 2013 15:03:45 GMT
Author: aconway
Date: Wed May 15 15:03:44 2013
New Revision: 1482881

URL: http://svn.apache.org/r1482881
Log:
QPID-4745: HA safe port allocation for brokers in HA tests.

Many HA tests use --port=0 to start a broker on an available port, but then need
to shutdown and restart the broker on the same port. This is not safe, on a busy
system it is possible for another process to take the port between the time the
broker is shut down and the time it is restarted.

The solution is to do bind(0) and listen in the python test framework (class
HaPort) and let the broker use the socket using qpidd --socket-fd. When the
broker is shut down the port remains bound by the python process. When the
broker is re-started it again is given access to the socket via --socket-fd.

Other changes
- move ha_store_tests into ha_tests.
- add heartbeats to avoid stalling.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1482881&r1=1482880&r2=1482881&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Wed May 15 15:03:44 2013
@@ -878,7 +878,7 @@ namespace {
 
 // Called by ConnectionObserver::disconnected, disconnected from the network side.
 void BrokerReplicator::disconnected() {
-    QPID_LOG(info, logPrefix << "Disconnected from " << primary);
+    QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
     connection = 0;
     // Clean up auto-delete queues
     vector<boost::shared_ptr<Exchange> > collect;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1482881&r1=1482880&r2=1482881&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Wed May 15 15:03:44 2013
@@ -103,6 +103,10 @@ void StatusCheckThread::run() {
     catch(const exception& e) {
         QPID_LOG(warning, statusCheck.logPrefix << "Error closing status check connection
to " << url);
     }
+    try { c.close(); }
+    catch(const exception& e) {
+        QPID_LOG(warning, "Error closing status check connection to " << url);
+    }
     delete this;
 }
 

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1482881&r1=1482880&r2=1482881&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Wed May 15 15:03:44 2013
@@ -134,14 +134,8 @@ class Popen(subprocess.Popen):
         self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id)
         if stdout == FILE: stdout = open(self.outfile("out"), "w")
         if stderr == FILE: stderr = open(self.outfile("err"), "w")
-        try:
-            subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
-                                      stdin=stdin, stdout=stdout, stderr=stderr,
-                                      close_fds=True)
-        except ValueError: # Windows can't do close_fds
-            subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
-                                      stdin=stdin, stdout=stdout, stderr=stderr)
-
+        subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
+                                  stdin=stdin, stdout=stdout, stderr=stderr)
         f = open(self.outfile("cmd"), "w")
         try: f.write("%s\n%d"%(self.cmd_str(), self.pid))
         finally: f.close()
@@ -180,6 +174,7 @@ class Popen(subprocess.Popen):
         finally:
             self.wait()                 # Clean up the process.
 
+
     def communicate(self, input=None):
         ret = subprocess.Popen.communicate(self, input)
         self.cleanup()

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1482881&r1=1482880&r2=1482881&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Wed May 15 15:03:44 2013
@@ -63,18 +63,57 @@ class Credentials(object):
 
     def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
 
+class HaPort:
+    """Many HA tests need to allocate a broker port dynamically and then kill
+    and restart a broker on that same port multiple times. qpidd --port=0 only
+    ensures the port for the initial broker process, subsequent brokers re-using
+    the same port may fail with "address already in use".
+
+    HaPort binds and listens to the port and returns a file descriptor to pass
+    to qpidd --socket-fd. It holds on to the port untill the end of the test so
+    the broker can restart multiple times.
+    """
+
+    def __init__(self, test, port=0):
+        """Bind and listen to port. port=0 allocates a port dynamically.
+        self.port is the allocated port, self.fileno is the file descriptor for
+        qpid --socket-fd."""
+
+        self.test = test
+        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.socket.bind(("", port))
+        self.socket.listen(5)
+        self.port = self.socket.getsockname()[1]
+        self.fileno = self.socket.fileno()
+        self.stopped = False
+        test.cleanup_stop(self) # Stop during test.tearDown
+
+    def stop(self):             # Called in tearDown
+        if not self.stopped:
+            self.stopped = True
+            self.socket.shutdown(socket.SHUT_RDWR)
+            self.socket.close()
+
+    def __str__(self): return "HaPort<port:%s, fileno:%s>"%(self.port, self.fileno)
+
+
 class HaBroker(Broker):
     """Start a broker with HA enabled
     @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
     """
-    def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
-                 client_credentials=None, **kwargs):
+    def __init__(self, test, ha_port=None, args=[], brokers_url=None, ha_cluster=True,
+                 ha_replicate="all", client_credentials=None, **kwargs):
         assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+        ha_port = ha_port or HaPort(test)
         args = copy(args)
         args += ["--load-module", BrokerTest.ha_lib,
                  "--log-enable=debug+:ha::",
-                 # FIXME aconway 2012-02-13: workaround slow link failover.
+                 # Non-standard settings for faster tests.
                  "--link-maintenance-interval=0.1",
+                 # Heartbeat and negotiate time are needed so that a broker wont
+                 # stall on an address that doesn't currently have a broker running.
+                 "--link-heartbeat-interval=1",
+                 "--max-negotiate-time=1000",
                  "--ha-cluster=%s"%ha_cluster]
         if ha_replicate is not None:
             args += [ "--ha-replicate=%s"%ha_replicate ]
@@ -89,7 +128,8 @@ acl allow all all
             aclf.close()
         if not "--acl-file" in args:
             args += [ "--acl-file", acl, "--load-module", os.getenv("ACL_LIB") ]
-        Broker.__init__(self, test, args, **kwargs)
+        args += ["--socket-fd=%s"%ha_port.fileno, "--listen-disable=tcp"]
+        Broker.__init__(self, test, args, port=ha_port.port, **kwargs)
         self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
         assert os.path.exists(self.qpid_ha_path)
         self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
@@ -97,6 +137,7 @@ acl allow all all
         self.qpid_ha_script=import_script(self.qpid_ha_path)
         self._agent = None
         self.client_credentials = client_credentials
+        self.ha_port = ha_port
 
     def __str__(self): return Broker.__str__(self)
 
@@ -108,8 +149,7 @@ acl allow all all
             args = args + ["--sasl-mechanism", cred.mechanism]
         self.qpid_ha_script.main_except(["", "-b", url]+args)
 
-    def promote(self):
-        self.ready(); self.qpid_ha(["promote"])
+    def promote(self): self.ready(); self.qpid_ha(["promote"])
     def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
     def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
     def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
@@ -211,14 +251,16 @@ acl allow all all
     def ready(self):
         return Broker.ready(self, client_properties={"qpid.ha-admin":1})
 
-    def kill(self):
+    def kill(self, final=True):
+        if final: self.ha_port.stop()
         self._agent = None
         return Broker.kill(self)
 
+
 class HaCluster(object):
     _cluster_count = 0
 
-    def __init__(self, test, n, promote=True, wait=True, **kwargs):
+    def __init__(self, test, n, promote=True, wait=True, args=[], **kwargs):
         """Start a cluster of n brokers.
 
         @test: The test being run
@@ -227,36 +269,49 @@ class HaCluster(object):
         @wait: wait for primary active and backups ready. Ignored if promote=False
         """
         self.test = test
+        self.args = args
         self.kwargs = kwargs
+        self._ports = [HaPort(test) for i in xrange(n)]
+        self._set_url()
         self._brokers = []
         self.id = HaCluster._cluster_count
         self.broker_id = 0
         HaCluster._cluster_count += 1
-        for i in xrange(n): self.start(False)
-        self.update_urls()
+        for i in xrange(n): self.start()
         if promote:
             self[0].promote()
             if wait:
                 self[0].wait_status("active")
                 for b in self[1:]: b.wait_status("ready")
 
-
     def next_name(self):
         name="cluster%s-%s"%(self.id, self.broker_id)
         self.broker_id += 1
         return name
 
-    def start(self, update_urls=True, args=[]):
-        """Start a new broker in the cluster"""
-        b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+    def _ha_broker(self, ha_port, name):
+        b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
+                     args=self.args, **self.kwargs)
         b.ready()
+        return b
+
+    def start(self):
+        """Start a new broker in the cluster"""
+        i = len(self)
+        assert i <= len(self._ports)
+        if i == len(self._ports):
+            self._ports.append(HaPort(self.test))
+            self._set_url()
+            self._update_urls()
+        b = self._ha_broker(self._ports[i], self.next_name())
         self._brokers.append(b)
-        if update_urls: self.update_urls()
         return b
 
-    def update_urls(self):
-        self.url = ",".join([b.host_port() for b in self])
-        if len(self) > 1:          # No failover addresses on a 1 cluster.
+    def _set_url(self):
+        self.url = ",".join("127.0.0.1:%s"%(p.port) for p in self._ports)
+
+    def _update_urls(self):
+        if len(self) > 1: # No failover addresses on a 1 cluster.
             for b in self:
                 b.set_brokers_url(self.url)
                 b.set_public_url(self.url)
@@ -265,29 +320,28 @@ class HaCluster(object):
         """Connect with reconnect_urls"""
         return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
 
-    def kill(self, i, promote_next=True):
+    def kill(self, i, promote_next=True, final=True):
         """Kill broker i, promote broker i+1"""
-        self[i].kill()
+        self[i].kill(final=final)
         if promote_next: self[(i+1) % len(self)].promote()
 
     def restart(self, i):
         """Start a broker with the same port, name and data directory. It will get
         a separate log file: foo.n.log"""
+        if self._ports[i].stopped: raise Exception("Restart after final kill: %s"%(self))
         b = self._brokers[i]
-        self._brokers[i] = HaBroker(
-            self.test, name=b.name, port=b.port(), brokers_url=self.url,
-            **self.kwargs)
+        self._brokers[i] = self._ha_broker(self._ports[i], b.name)
         self._brokers[i].ready()
 
     def bounce(self, i, promote_next=True):
         """Stop and restart a broker in a cluster."""
         if (len(self) == 1):
-            self.kill(i, promote_next=False)
+            self.kill(i, promote_next=False, final=False)
             self.restart(i)
             self[i].ready()
             if promote_next: self[i].promote()
         else:
-            self.kill(i, promote_next)
+            self.kill(i, promote_next, final=False)
             self.restart(i)
 
     # Behave like a list of brokers.

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1482881&r1=1482880&r2=1482881&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Wed May 15 15:03:44 2013
@@ -230,25 +230,17 @@ class ReplicationTests(HaBrokerTest):
 
     def test_failover_cpp(self):
         """Verify that failover works in the C++ client."""
-        primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
-        primary.promote()
-        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
-        url="%s,%s"%(primary.host_port(), backup.host_port())
-        primary.connect().session().sender("q;{create:always}")
-        backup.wait_backup("q")
-
-        sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
-        receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False)
+        cluster = HaCluster(self, 2)
+        cluster[0].connect().session().sender("q;{create:always}")
+        cluster[1].wait_backup("q")
+        sender = NumberedSender(cluster[0], url=cluster.url, queue="q", failover_updates
= False)
+        receiver = NumberedReceiver(cluster[0], url=cluster.url, queue="q", failover_updates
= False)
         receiver.start()
         sender.start()
-        backup.wait_backup("q")
         assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
-
-        primary.kill()
-        assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
-        backup.promote()
-        n = receiver.received       # Make sure we are still running
-        assert retry(lambda: receiver.received > n + 10)
+        cluster.kill(0)
+        n = receiver.received
+        assert retry(lambda: receiver.received > n + 10) # Verify we are still going
         sender.stop()
         receiver.stop()
 
@@ -372,16 +364,14 @@ class ReplicationTests(HaBrokerTest):
 
     def test_priority(self):
         """Verify priority queues replicate correctly"""
-        primary  = HaBroker(self, name="primary")
-        primary.promote()
-        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
-        session = primary.connect().session()
+        cluster = HaCluster(self, 2)
+        session = cluster[0].connect().session()
         s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
         priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
         for p in priorities: s.send(Message(priority=p))
         # Can't use browse_backup as browser sees messages in delivery order not priority.
-        backup.wait_backup("priority-queue")
-        r = backup.connect_admin().session().receiver("priority-queue")
+        cluster[1].wait_backup("priority-queue")
+        r = cluster[1].connect_admin().session().receiver("priority-queue")
         received = [r.fetch().priority for i in priorities]
         self.assertEqual(sorted(priorities, reverse=True), received)
 
@@ -480,9 +470,8 @@ class ReplicationTests(HaBrokerTest):
 
     def test_replicate_binding(self):
         """Verify that binding replication can be disabled"""
-        primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
-        primary.promote()
-        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+        cluster = HaCluster(self, 2)
+        primary, backup = cluster[0], cluster[1]
         ps = primary.connect().session()
         ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all},
type:'fanout'}}}")
         ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}")
@@ -867,7 +856,7 @@ acl deny all all
         self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1)
 
         # Simulate the race by re-creating the objects before promoting the new primary
-        cluster.kill(0, False)
+        cluster.kill(0, promote_next=False)
         xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
         node = "node:{%s}"%(xdecl)
         sn = cluster[1].connect_admin().session()
@@ -946,9 +935,10 @@ class LongTests(HaBrokerTest):
 
         # Start sender and receiver threads
         n = 10
-        senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
+        senders = [NumberedSender(brokers[0], url=brokers.url,
+                                  max_depth=1024, failover_updates=False,
                                  queue="test%s"%(i)) for i in xrange(n)]
-        receivers = [NumberedReceiver(brokers[0], sender=senders[i],
+        receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i],
                                       failover_updates=False,
                                       queue="test%s"%(i)) for i in xrange(n)]
         for r in receivers: r.start()
@@ -987,10 +977,9 @@ class LongTests(HaBrokerTest):
                         brokers.bounce(victim) # Next one is promoted
                         primary = next
                     else:
-                        brokers.kill(victim, False)
+                        brokers.kill(victim, promote_next=False, final=False)
                         dead = victim
 
-                    # At this point the primary is running with 1 or 2 backups
                     # Make sure we are not stalled
                     map(wait_passed, receivers, checkpoint)
                     # Run another checkpoint to ensure things work in this configuration
@@ -1090,11 +1079,11 @@ class RecoveryTests(HaBrokerTest):
             # Create a queue before the failure.
             s1 = cluster.connect(0).session().sender("q1;{create:always}")
             for b in cluster: b.wait_backup("q1")
-            for i in xrange(100): s1.send(str(i))
+            for i in xrange(10): s1.send(str(i))
 
             # Kill primary and 2 backups
             cluster[3].wait_status("ready")
-            for i in [0,1,2]: cluster.kill(i, False)
+            for i in [0,1,2]: cluster.kill(i, promote_next=False, final=False)
             cluster[3].promote()    # New primary, backups will be 1 and 2
             cluster[3].wait_status("recovering")
 
@@ -1108,31 +1097,33 @@ class RecoveryTests(HaBrokerTest):
             s2 = cluster.connect(3).session().sender("q2;{create:always}")
 
             # Verify that messages sent are not completed
-            for i in xrange(100,200):
-                s1.send(str(i), sync=False);
-                s2.send(str(i), sync=False)
+            for i in xrange(10,20):
+                s1.send(str(i), sync=False, timeout=0.1);
+                s2.send(str(i), sync=False, timeout=0.1)
+
             assertSyncTimeout(s1)
-            self.assertEqual(s1.unsettled(), 100)
+            self.assertEqual(s1.unsettled(), 10)
             assertSyncTimeout(s2)
-            self.assertEqual(s2.unsettled(), 100)
+            self.assertEqual(s2.unsettled(), 10)
 
             # Verify we can receive even if sending is on hold:
-            cluster[3].assert_browse("q1", [str(i) for i in range(200)])
+            cluster[3].assert_browse("q1", [str(i) for i in range(10)])
 
             # Restart backups, verify queues are released only when both backups are up
             cluster.restart(1)
             assertSyncTimeout(s1)
-            self.assertEqual(s1.unsettled(), 100)
+            self.assertEqual(s1.unsettled(), 10)
             assertSyncTimeout(s2)
-            self.assertEqual(s2.unsettled(), 100)
+            self.assertEqual(s2.unsettled(), 10)
             cluster.restart(2)
+            cluster.restart(0)
 
             # Verify everything is up to date and active
             def settled(sender): sender.sync(timeout=1); return sender.unsettled() == 0;
             assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
             assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
-            cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
-            cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+            cluster[1].assert_browse_backup("q1", [str(i) for i in range(10)+range(10,20)])
+            cluster[1].assert_browse_backup("q2", [str(i) for i in range(10,20)])
             cluster[3].wait_status("active"),
             s1.session.connection.close()
             s2.session.connection.close()
@@ -1164,8 +1155,7 @@ class RecoveryTests(HaBrokerTest):
         """If we join a cluster where the primary is dead, the new primary is
         not yet promoted and there are ready backups then we should refuse
         promotion so that one of the ready backups can be chosen."""
-        # FIXME aconway 2012-10-05: smaller timeout
-        cluster = HaCluster(self, 2, args=["--link-heartbeat-interval", 1])
+        cluster = HaCluster(self, 2)
         cluster[0].wait_status("active")
         cluster[1].wait_status("ready")
         cluster.bounce(0, promote_next=False)
@@ -1206,9 +1196,9 @@ class ConfigurationTests(HaBrokerTest):
         b = start("none", "none")
         check(b, "", "")
 
-
 class StoreTests(BrokerTest):
     """Test for HA with persistence."""
+
     def check_skip(self):
         if not BrokerTest.store_lib:
             print "WARNING: skipping HA+store tests, no store lib found."
@@ -1255,7 +1245,7 @@ class StoreTests(BrokerTest):
         doing catch-up from the primary."""
         if self.check_skip(): return
         cluster = HaCluster(self, 2)
-        sn = cluster[0].connect().session()
+        sn = cluster[0].connect(heartbeat=1).session()
         s1 = sn.sender("q1;{create:always,node:{durable:true}}")
         for m in ["foo","bar"]: s1.send(Message(m, durable=True))
         s2 = sn.sender("q2;{create:always,node:{durable:true}}")
@@ -1264,10 +1254,9 @@ class StoreTests(BrokerTest):
         # Wait for backup to catch up.
         cluster[1].assert_browse_backup("q1", ["foo","bar"])
         cluster[1].assert_browse_backup("q2", ["hello"])
-
         # Make changes that the backup doesn't see
-        cluster.kill(1, promote_next=False)
-        r1 = cluster[0].connect().session().receiver("q1")
+        cluster.kill(1, promote_next=False, final=False)
+        r1 = cluster[0].connect(heartbeat=1).session().receiver("q1")
         for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
         r1.session.acknowledge()
         for m in ["x","y","z"]: s1.send(Message(m, durable=True))
@@ -1285,7 +1274,8 @@ class StoreTests(BrokerTest):
         # Verify state
         cluster[0].assert_browse("q1",  ["x","y","z"])
         cluster[1].assert_browse_backup("q1",  ["x","y","z"])
-        sn = cluster[0].connect().session() # FIXME aconway 2012-09-25: should fail over!
+
+        sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should
fail over!
         sn.sender("ex/k1").send("boo")
         cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
         cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
@@ -1298,8 +1288,6 @@ if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)
     qpid_ha = os.getenv("QPID_HA_EXEC")
     if  qpid_ha and os.path.exists(qpid_ha):
-        os.execvp("qpid-python-test",
-                  ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
+        os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
     else:
         print "Skipping ha_tests, %s not available"%(qpid_ha)
-



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message