qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1165877 - /qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark
Date Tue, 06 Sep 2011 21:46:05 GMT
Author: aconway
Date: Tue Sep  6 21:46:05 2011
New Revision: 1165877

URL: http://svn.apache.org/viewvc?rev=1165877&view=rev
Log:
QPID-2920: minor improvements to qpid-cpp-benchmark

Re-arranged queue creation/deletion to avoid wiring races.

Modified:
    qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1165877&r1=1165876&r2=1165877&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark Tue Sep  6 21:46:05 2011
@@ -92,7 +92,7 @@ class Clients:
 clients = Clients()
 
 def start_receive(queue, index, opts, ready_queue, broker, host):
-    address_opts=["create:receiver"] + opts.receive_option
+    address_opts=["create:always"] + opts.receive_option
     if opts.durable: address_opts += ["node:{durable:true}"]
     address="%s;{%s}"%(queue,",".join(address_opts))
     msg_total=opts.senders*opts.messages
@@ -108,7 +108,7 @@ def start_receive(queue, index, opts, re
                "--receive-rate", str(opts.receive_rate),
                "--report-total",
                "--ack-frequency", str(opts.ack_frequency),
-               "--ready-address", ready_queue,
+               "--ready-address", "%s;{create:always}"%ready_queue,
                "--report-header=no"
                ]
     command += opts.receive_arg
@@ -118,7 +118,7 @@ def start_receive(queue, index, opts, re
     return clients.add(Popen(command, stdout=PIPE))
 
 def start_send(queue, opts, broker, host):
-    address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:receiver"]))
+    address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
     command = ["qpid-send",
                "-b", broker,
                "-a", address,
@@ -138,23 +138,33 @@ def start_send(queue, opts, broker, host
     if host: command = ssh_command(host, command)
     return clients.add(Popen(command, stdout=PIPE))
 
+def error_msg(out, err): return ("\n".join(filter(None, [out, err]))).strip()
+
 def first_line(p):
     out,err=p.communicate()
-    if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip()))
+    if p.returncode != 0: raise Exception("Process failed: %s"%error_msg(out,err))
     return out.split("\n")[0]
 
 def delete_queues(queues, broker):
     c = qpid.messaging.Connection(broker)
     c.open()
+    s = c.session()
     for q in queues:
         try:
-            s = c.session()
             snd = s.sender("%s;{delete:always}"%(q))
             snd.close()
-            s.sync()
         except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
+    s.sync()
     c.close()
 
+def create_queues(queues, broker):
+    c = qpid.messaging.Connection(broker)
+    c.open()
+    s = c.session()
+    for q in queues:
+        s.sender("%s;{create:always}"%q)
+    s.sync()
+
 def print_header(timestamp):
     if timestamp: latency_header="\tl-min\tl-max\tl-avg"
     else: latency_header=""
@@ -196,7 +206,6 @@ def print_summary(send_stats, recv_stats
 class ReadyReceiver:
     """A receiver for ready messages"""
     def __init__(self, queue, broker):
-        delete_queues([queue], broker)
         self.connection = qpid.messaging.Connection(broker)
         self.connection.open()
         self.receiver = self.connection.session().receiver(
@@ -212,7 +221,7 @@ class ReadyReceiver:
             for r in receivers:
                 if (r.poll() is not None):
                     out,err=r.communicate()
-                    raise Exception("Receiver error: %s"%(out))
+                    raise Exception("Receiver error: %s"%error_msg(out,err))
             raise Exception("Timed out waiting for receivers to be ready")
 
 def flatten(l):
@@ -243,8 +252,10 @@ def main():
     try:
         for i in xrange(opts.repeat):
             delete_queues(queues, opts.broker[0])
+            time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring
+            create_queues(queues, opts.broker[0])
+            time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring
             ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
-            time.sleep(.1) # FIXME aconway 2011-03-16: new cluster async wiring
             receivers = [start_receive(q, j, opts, ready_queue,
                                        brokers.next(), client_hosts.next())
                          for q in queues for j in xrange(opts.receivers)]



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message