qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1149747 - /qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cpp-benchmark
Date Fri, 22 Jul 2011 22:05:48 GMT
Author: aconway
Date: Fri Jul 22 22:05:47 2011
New Revision: 1149747

URL: http://svn.apache.org/viewvc?rev=1149747&view=rev
Log:
QPID-2920: Improvements to qpid-cpp-benchmark.

- fixed error message.
- add necessary waiting to deal with async wiring in the new cluster.

Modified:
    qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cpp-benchmark

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1149747&r1=1149746&r2=1149747&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cpp-benchmark Fri Jul 22 22:05:47 2011
@@ -92,7 +92,7 @@ class Clients:
 clients = Clients()
 
 def start_receive(queue, index, opts, ready_queue, broker, host):
-    address_opts=["create:always"] + opts.receive_option
+    address_opts=opts.receive_option
     if opts.durable: address_opts += ["node:{durable:true}"]
     address="%s;{%s}"%(queue,",".join(address_opts))
     msg_total=opts.senders*opts.messages
@@ -115,7 +115,7 @@ def start_receive(queue, index, opts, re
     if opts.connection_options:
         command += ["--connection-options",opts.connection_options]
     if host: command = ssh_command(host, command)
-    return clients.add(Popen(command, stdout=PIPE))
+    return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
 
 def start_send(queue, opts, broker, host):
     address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
@@ -136,34 +136,45 @@ def start_send(queue, opts, broker, host
     if opts.connection_options:
         command += ["--connection-options",opts.connection_options]
     if host: command = ssh_command(host, command)
-    return clients.add(Popen(command, stdout=PIPE))
+    return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
 
-def error_msg(out, err): return ("\n".join(filter(None, [out, err]))).strip()
+def error_msg(out, err):
+    return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err))
 
 def first_line(p):
     out,err=p.communicate()
-    if p.returncode != 0: raise Exception("Process failed: %s"%error_msg(out,err))
+    if p.returncode != 0:
+        raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
     return out.split("\n")[0]
 
-def delete_queues(queues, broker):
+def queue_exists(queue,broker):
     c = qpid.messaging.Connection(broker)
     c.open()
     s = c.session()
-    for q in queues:
+    try:
         try:
-            snd = s.sender("%s;{delete:always}"%(q))
-            snd.close()
-        except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
-    s.sync()
-    c.close()
+            s.sender(queue)
+            return True
+        except qpid.messaging.exceptions.NotFound:
+            return False
+    finally: c.close()
 
-def create_queues(queues, broker):
-    c = qpid.messaging.Connection(broker)
+def recreate_queues(queues, brokers):
+    c = qpid.messaging.Connection(brokers[0])
     c.open()
     s = c.session()
     for q in queues:
+        try: s.sender("%s;{delete:always}"%(q)).close()
+        except qpid.messaging.exceptions.NotFound: pass
+        # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
+        for b in brokers:
+            while queue_exists(q,b): time.sleep(0.001);
+    for q in queues:
         s.sender("%s;{create:always}"%q)
-    s.sync()
+        # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
+        for b in brokers:
+            while not queue_exists(q,b): time.sleep(0.001);
+    c.close()
 
 def print_header(timestamp):
     if timestamp: latency_header="\tl-min\tl-max\tl-avg"
@@ -250,10 +261,7 @@ def main():
     queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
     try:
         for i in xrange(opts.repeat):
-            delete_queues(queues, opts.broker[0])
-            time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring
-            create_queues(queues, opts.broker[0])
-            time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring
+            recreate_queues(queues, opts.broker)
             ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
             receivers = [start_receive(q, j, opts, ready_queue,
                                        brokers.next(), client_hosts.next())



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message