Return-Path: Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: (qmail 27806 invoked from network); 7 Apr 2010 19:51:47 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Apr 2010 19:51:47 -0000 Received: (qmail 77048 invoked by uid 500); 7 Apr 2010 19:51:47 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 77028 invoked by uid 500); 7 Apr 2010 19:51:47 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 77021 invoked by uid 99); 7 Apr 2010 19:51:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Apr 2010 19:51:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Apr 2010 19:51:42 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B91D323889B3; Wed, 7 Apr 2010 19:51:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r931657 - in /qpid/trunk/qpid: cpp/src/tests/Makefile.am cpp/src/tests/Statistics.cpp cpp/src/tests/Statistics.h cpp/src/tests/qpid_recv.cpp cpp/src/tests/qpid_send.cpp doc/book/.gitignore Date: Wed, 07 Apr 2010 19:51:20 -0000 To: commits@qpid.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100407195120.B91D323889B3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: aconway Date: Wed Apr 7 19:51:20 2010 New Revision: 931657 URL: http://svn.apache.org/viewvc?rev=931657&view=rev Log: Extend qpid_send, qpid_recv to measure throughput and latency. Added: qpid/trunk/qpid/cpp/src/tests/Statistics.cpp (with props) qpid/trunk/qpid/cpp/src/tests/Statistics.h (with props) qpid/trunk/qpid/doc/book/.gitignore Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=931657&r1=931656&r2=931657&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Apr 7 19:51:20 2010 @@ -166,14 +166,16 @@ qpidtest_PROGRAMS += qpid_recv qpid_recv_SOURCES = \ qpid_recv.cpp \ TestOptions.h \ - ConnectionOptions.h + ConnectionOptions.h \ + Statistics.cpp qpid_recv_LDADD = $(lib_client) qpidtest_PROGRAMS += qpid_send qpid_send_SOURCES = \ qpid_send.cpp \ TestOptions.h \ - ConnectionOptions.h + ConnectionOptions.h \ + Statistics.cpp qpid_send_LDADD = $(lib_client) qpidtest_PROGRAMS+=perftest Added: qpid/trunk/qpid/cpp/src/tests/Statistics.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Statistics.cpp?rev=931657&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/Statistics.cpp (added) +++ qpid/trunk/qpid/cpp/src/tests/Statistics.cpp Wed Apr 7 19:51:20 2010 @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Statistics.h" +#include +#include + +namespace qpid { +namespace tests { + +Throughput::Throughput() : messages(0), started(false) {} + +void Throughput::message(const messaging::Message&) { + ++messages; + if (!started) { + start = sys::now(); + started = true; + } +} + +void Throughput::header(std::ostream& o) const { + o << "msg/sec"; +} + +void Throughput::report(std::ostream& o) const { + double elapsed(int64_t(sys::Duration(start, sys::now()))/double(sys::TIME_SEC)); + o << messages/elapsed; +} + +ThroughputAndLatency::ThroughputAndLatency() : + total(0), + min(std::numeric_limits::max()), + max(std::numeric_limits::min()) +{} + +void ThroughputAndLatency::message(const messaging::Message& m) { + Throughput::message(m); + types::Variant::Map::const_iterator i = m.getProperties().find("ts"); + if (i != m.getProperties().end()) { + int64_t start(i->second.asInt64()); + int64_t end(sys::Duration(sys::AbsTime::epoch(),sys::now())); + double latency = double(end - start)/sys::TIME_MSEC; + if (latency > 0) { + total += latency; + if (latency < min) min = latency; + if (latency > max) max = latency; + } + } +} + +void ThroughputAndLatency::header(std::ostream& o) const { + Throughput::header(o); + o << " latency(ms)min max avg"; +} + +void ThroughputAndLatency::report(std::ostream& o) const { + Throughput::report(o); + o << " "; + if (messages) + o << min << " " << max << " " << total/messages; + else + o << "Can't compute latency for 0 messages."; +} + +ReporterBase::ReporterBase(std::ostream& o, int batch) + : wantBatch(batch), batchCount(0), headerPrinted(false), out(o) {} + +/** Count message in the statistics */ +void ReporterBase::message(const messaging::Message& m) { + if (!overall.get()) overall = create(); + overall->message(m); + if (wantBatch) { + if (!batch.get()) batch = create(); + batch->message(m); + if (++batchCount == wantBatch) { + header(); + batch->report(out); + out << std::endl; + batch = create(); + batchCount = 0; + } + } +} + +/** Print overall report. */ +void ReporterBase::report() { + header(); + overall->report(out); + out << std::endl; +} + +void ReporterBase::header() { + if (!headerPrinted) { + if (!overall.get()) overall = create(); + overall->header(out); + out << std::endl; + headerPrinted = true; + } +} + + +}} // namespace qpid::tests Propchange: qpid/trunk/qpid/cpp/src/tests/Statistics.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/tests/Statistics.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: qpid/trunk/qpid/cpp/src/tests/Statistics.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Statistics.h?rev=931657&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/Statistics.h (added) +++ qpid/trunk/qpid/cpp/src/tests/Statistics.h Wed Apr 7 19:51:20 2010 @@ -0,0 +1,106 @@ +#ifndef TESTS_STATISTICS_H +#define TESTS_STATISTICS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include +#include +#include +#include + +namespace qpid { + +namespace messaging { +class Message; +} + +namespace tests { + +class Statistic { + public: + virtual void message(const messaging::Message&) = 0; + virtual void report(std::ostream&) const = 0; + virtual void header(std::ostream&) const = 0; +}; + +class Throughput : public Statistic { + public: + Throughput(); + virtual void message(const messaging::Message&); + virtual void report(std::ostream&) const; + virtual void header(std::ostream&) const; + + protected: + int messages; + + private: + bool started; + sys::AbsTime start; +}; + +class ThroughputAndLatency : public Throughput { + public: + ThroughputAndLatency(); + virtual void message(const messaging::Message&); + virtual void report(std::ostream&) const; + virtual void header(std::ostream&) const; + + private: + double total, min, max; // Milliseconds +}; + +/** Report batch and overall statistics */ +class ReporterBase { + public: + /** Count message in the statistics */ + void message(const messaging::Message& m); + + /** Print overall report. */ + void report(); + + protected: + ReporterBase(std::ostream& o, int batchSize); + virtual std::auto_ptr create() = 0; + + private: + void header(); + void report(const Statistic& s); + std::auto_ptr overall; + std::auto_ptr batch; + bool wantOverall; + int wantBatch, batchCount; + bool stopped, headerPrinted; + std::ostream& out; +}; + +template class Reporter : public ReporterBase { + public: + Reporter(std::ostream& o, int batchSize) : ReporterBase(o, batchSize) {} + virtual std::auto_ptr create() { + return std::auto_ptr(new Stats); + } +}; + +}} // namespace qpid::tests + +#endif /*!TESTS_STATISTICS_H*/ Propchange: qpid/trunk/qpid/cpp/src/tests/Statistics.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/tests/Statistics.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp?rev=931657&r1=931656&r2=931657&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp Wed Apr 7 19:51:20 2010 @@ -29,6 +29,7 @@ #include #include #include "TestOptions.h" +#include "Statistics.h" #include #include @@ -56,9 +57,12 @@ struct Options : public qpid::Options uint ackFrequency; uint tx; uint rollbackFrequency; + bool printContent; bool printHeaders; bool failoverUpdates; qpid::log::Options log; + bool report; + uint reportEvery; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -68,13 +72,16 @@ struct Options : public qpid::Options forever(false), messages(0), ignoreDuplicates(false), - capacity(0), - ackFrequency(1), + capacity(10000), + ackFrequency(100), tx(0), rollbackFrequency(0), + printContent(true), printHeaders(false), failoverUpdates(false), - log(argv0) + log(argv0), + report(false), + reportEvery(0) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -84,12 +91,15 @@ struct Options : public qpid::Options ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever") ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") - ("capacity", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)") + ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") - ("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content") + ("print-content", qpid::optValue(printContent, "yes|no"), "print out message content") + ("print-headers", qpid::optValue(printHeaders, "yes|no"), "print out message headers") ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") + ("report", qpid::optValue(report), "Report throughput statistics") + ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -162,7 +172,9 @@ int main(int argc, char ** argv) SequenceTracker sequenceTracker; Duration timeout = opts.getTimeout(); bool done = false; + Reporter reporter(std::cout, opts.reportEvery); while (!done && receiver.fetch(msg, timeout)) { + reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { if (msg.getContent() == EOS) { done = true; @@ -179,7 +191,8 @@ int main(int argc, char ** argv) std::cout << "Properties: " << msg.getProperties() << std::endl; std::cout << std::endl; } - std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages + if (opts.printContent) + std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages if (opts.messages && count >= opts.messages) done = true; } } @@ -194,6 +207,7 @@ int main(int argc, char ** argv) } //opts.rejectFrequency?? } + if (opts.report) reporter.report(); if (opts.tx) { if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { session.rollback(); Modified: qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp?rev=931657&r1=931656&r2=931657&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp Wed Apr 7 19:51:20 2010 @@ -25,7 +25,10 @@ #include #include #include +#include +#include #include "TestOptions.h" +#include "Statistics.h" #include #include @@ -34,7 +37,6 @@ using namespace qpid::messaging; using namespace qpid::types; using qpid::client::amqp0_10::FailoverUpdates; - typedef std::vector string_vector; using namespace std; @@ -64,20 +66,26 @@ struct Options : public qpid::Options uint capacity; bool failoverUpdates; qpid::log::Options log; + bool report; + uint reportEvery; + uint rate; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), help(false), url("amqp:tcp:127.0.0.1"), - count(1), + count(0), sendEos(0), durable(false), ttl(0), tx(0), rollbackFrequency(0), - capacity(0), + capacity(1000), failoverUpdates(false), - log(argv0) + log(argv0), + report(false), + reportEvery(0), + rate(0) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -87,17 +95,20 @@ struct Options : public qpid::Options ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one") ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address") ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") - ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.") + ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") ("map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content") ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") - ("content", qpid::optValue(content, "CONTENT"), "specify textual content") + ("content", qpid::optValue(content, "CONTENT"), "use CONTENT as message content instead of reading from stdin") ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue") ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") + ("report", qpid::optValue(report), "Report throughput statistics") + ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput statistics every N messages") + ("rate", qpid::optValue(rate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -177,6 +188,29 @@ const string EOS("eos"); using namespace qpid::tests; +class ContentGenerator { + public: + virtual bool getContent(std::string& content) = 0; +}; + +class GetlineContentGenerator : public ContentGenerator { + public: + virtual bool getContent(std::string& content) { return getline(std::cin, content); } +}; + +class FixedContentGenerator : public ContentGenerator { + public: + FixedContentGenerator(std::string s) : content(s) {} + virtual bool getContent(std::string& contentOut) { + contentOut = content; + return true; + } + private: + std::string content; +}; + + + int main(int argc, char ** argv) { Options opts; @@ -200,18 +234,41 @@ int main(int argc, char ** argv) std::string content; uint sent = 0; uint txCount = 0; - while (getline(std::cin, content)) { + Reporter reporter(std::cout, opts.reportEvery); + + std::auto_ptr contentGen; + if (!opts.content.empty()) + contentGen.reset(new FixedContentGenerator(opts.content)); + else + contentGen.reset(new GetlineContentGenerator); + + qpid::sys::AbsTime start = qpid::sys::now(); + int64_t interval = 0; + if (opts.rate) interval = qpid::sys::TIME_SEC/opts.rate; + + while (contentGen->getContent(content)) { msg.setContent(content); msg.getProperties()["sn"] = ++sent; + msg.getProperties()["ts"] = int64_t( + qpid::sys::Duration(qpid::sys::AbsTime::epoch(), qpid::sys::now())); sender.send(msg); + reporter.message(msg); if (opts.tx && (sent % opts.tx == 0)) { - if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { + if (opts.rollbackFrequency && + (++txCount % opts.rollbackFrequency == 0)) session.rollback(); - } else { + else session.commit(); - } - } + } + if (opts.count && sent >= opts.count) break; + if (opts.rate) { + qpid::sys::AbsTime waitTill(start, sent*interval); + int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); + if (delay > 0) + qpid::sys::usleep(delay/qpid::sys::TIME_USEC); + } } + if (opts.report) reporter.report(); for (uint i = opts.sendEos; i > 0; --i) { msg.getProperties()["sn"] = ++sent; msg.setContent(EOS);//TODO: add in ability to send digest or similar Added: qpid/trunk/qpid/doc/book/.gitignore URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/.gitignore?rev=931657&view=auto ============================================================================== --- qpid/trunk/qpid/doc/book/.gitignore (added) +++ qpid/trunk/qpid/doc/book/.gitignore Wed Apr 7 19:51:20 2010 @@ -0,0 +1,2 @@ +/build +/out --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org