Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A16FF774D for ; Tue, 6 Sep 2011 21:46:27 +0000 (UTC) Received: (qmail 53194 invoked by uid 500); 6 Sep 2011 21:46:27 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 53173 invoked by uid 500); 6 Sep 2011 21:46:27 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 53166 invoked by uid 99); 6 Sep 2011 21:46:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Sep 2011 21:46:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Sep 2011 21:46:26 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id EBEF12388B45 for ; Tue, 6 Sep 2011 21:46:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1165877 - /qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark Date: Tue, 06 Sep 2011 21:46:05 -0000 To: commits@qpid.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110906214605.EBEF12388B45@eris.apache.org> Author: aconway Date: Tue Sep 6 21:46:05 2011 New Revision: 1165877 URL: http://svn.apache.org/viewvc?rev=1165877&view=rev Log: QPID-2920: minor improvements to qpid-cpp-benchmark Re-arranged queue creation/deletion to avoid wiring races. Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1165877&r1=1165876&r2=1165877&view=diff ============================================================================== --- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark (original) +++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark Tue Sep 6 21:46:05 2011 @@ -92,7 +92,7 @@ class Clients: clients = Clients() def start_receive(queue, index, opts, ready_queue, broker, host): - address_opts=["create:receiver"] + opts.receive_option + address_opts=["create:always"] + opts.receive_option if opts.durable: address_opts += ["node:{durable:true}"] address="%s;{%s}"%(queue,",".join(address_opts)) msg_total=opts.senders*opts.messages @@ -108,7 +108,7 @@ def start_receive(queue, index, opts, re "--receive-rate", str(opts.receive_rate), "--report-total", "--ack-frequency", str(opts.ack_frequency), - "--ready-address", ready_queue, + "--ready-address", "%s;{create:always}"%ready_queue, "--report-header=no" ] command += opts.receive_arg @@ -118,7 +118,7 @@ def start_receive(queue, index, opts, re return clients.add(Popen(command, stdout=PIPE)) def start_send(queue, opts, broker, host): - address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:receiver"])) + address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) command = ["qpid-send", "-b", broker, "-a", address, @@ -138,23 +138,33 @@ def start_send(queue, opts, broker, host if host: command = ssh_command(host, command) return clients.add(Popen(command, stdout=PIPE)) +def error_msg(out, err): return ("\n".join(filter(None, [out, err]))).strip() + def first_line(p): out,err=p.communicate() - if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip())) + if p.returncode != 0: raise Exception("Process failed: %s"%error_msg(out,err)) return out.split("\n")[0] def delete_queues(queues, broker): c = qpid.messaging.Connection(broker) c.open() + s = c.session() for q in queues: try: - s = c.session() snd = s.sender("%s;{delete:always}"%(q)) snd.close() - s.sync() except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue" + s.sync() c.close() +def create_queues(queues, broker): + c = qpid.messaging.Connection(broker) + c.open() + s = c.session() + for q in queues: + s.sender("%s;{create:always}"%q) + s.sync() + def print_header(timestamp): if timestamp: latency_header="\tl-min\tl-max\tl-avg" else: latency_header="" @@ -196,7 +206,6 @@ def print_summary(send_stats, recv_stats class ReadyReceiver: """A receiver for ready messages""" def __init__(self, queue, broker): - delete_queues([queue], broker) self.connection = qpid.messaging.Connection(broker) self.connection.open() self.receiver = self.connection.session().receiver( @@ -212,7 +221,7 @@ class ReadyReceiver: for r in receivers: if (r.poll() is not None): out,err=r.communicate() - raise Exception("Receiver error: %s"%(out)) + raise Exception("Receiver error: %s"%error_msg(out,err)) raise Exception("Timed out waiting for receivers to be ready") def flatten(l): @@ -243,8 +252,10 @@ def main(): try: for i in xrange(opts.repeat): delete_queues(queues, opts.broker[0]) + time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring + create_queues(queues, opts.broker[0]) + time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) - time.sleep(.1) # FIXME aconway 2011-03-16: new cluster async wiring receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.receivers)] --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org