qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r931657 - in /qpid/trunk/qpid: cpp/src/tests/Makefile.am cpp/src/tests/Statistics.cpp cpp/src/tests/Statistics.h cpp/src/tests/qpid_recv.cpp cpp/src/tests/qpid_send.cpp doc/book/.gitignore
Date Wed, 07 Apr 2010 19:51:20 GMT
Author: aconway
Date: Wed Apr  7 19:51:20 2010
New Revision: 931657

URL: http://svn.apache.org/viewvc?rev=931657&view=rev
Log:
Extend qpid_send, qpid_recv to measure throughput and latency.

Added:
    qpid/trunk/qpid/cpp/src/tests/Statistics.cpp   (with props)
    qpid/trunk/qpid/cpp/src/tests/Statistics.h   (with props)
    qpid/trunk/qpid/doc/book/.gitignore
Modified:
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=931657&r1=931656&r2=931657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Apr  7 19:51:20 2010
@@ -166,14 +166,16 @@ qpidtest_PROGRAMS += qpid_recv
 qpid_recv_SOURCES = \
   qpid_recv.cpp \
   TestOptions.h \
-  ConnectionOptions.h
+  ConnectionOptions.h \
+  Statistics.cpp
 qpid_recv_LDADD = $(lib_client)
 
 qpidtest_PROGRAMS += qpid_send
 qpid_send_SOURCES = \
   qpid_send.cpp \
   TestOptions.h \
-  ConnectionOptions.h
+  ConnectionOptions.h \
+  Statistics.cpp
 qpid_send_LDADD = $(lib_client)
 
 qpidtest_PROGRAMS+=perftest

Added: qpid/trunk/qpid/cpp/src/tests/Statistics.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Statistics.cpp?rev=931657&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Statistics.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/Statistics.cpp Wed Apr  7 19:51:20 2010
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Statistics.h"
+#include <qpid/messaging/Message.h>
+#include <ostream>
+
+namespace qpid {
+namespace tests {
+
+Throughput::Throughput() : messages(0), started(false) {}
+
+void Throughput::message(const messaging::Message&) {
+    ++messages;
+    if (!started) {
+        start = sys::now();
+        started = true;
+    }
+}
+
+void Throughput::header(std::ostream& o) const {
+    o << "msg/sec";
+}
+
+void Throughput::report(std::ostream& o) const {
+    double elapsed(int64_t(sys::Duration(start, sys::now()))/double(sys::TIME_SEC));
+    o << messages/elapsed;
+}
+
+ThroughputAndLatency::ThroughputAndLatency() :
+    total(0),
+    min(std::numeric_limits<double>::max()),
+    max(std::numeric_limits<double>::min())
+{}
+
+void ThroughputAndLatency::message(const messaging::Message& m) {
+    Throughput::message(m);
+    types::Variant::Map::const_iterator i = m.getProperties().find("ts");
+    if (i != m.getProperties().end()) {
+        int64_t start(i->second.asInt64());
+        int64_t end(sys::Duration(sys::AbsTime::epoch(),sys::now()));
+        double latency = double(end - start)/sys::TIME_MSEC;
+        if (latency > 0) {
+            total += latency;
+            if (latency < min) min = latency;
+            if (latency > max) max = latency;
+        }
+    }
+}
+
+void ThroughputAndLatency::header(std::ostream& o) const {
+    Throughput::header(o);
+    o << "  latency(ms)min max avg";
+}
+
+void ThroughputAndLatency::report(std::ostream& o) const {
+    Throughput::report(o);
+    o << "  ";
+    if (messages)
+        o << min << " "  << max << " " << total/messages;
+    else
+        o << "Can't compute latency for 0 messages.";
+}
+
+ReporterBase::ReporterBase(std::ostream& o, int batch)
+    : wantBatch(batch), batchCount(0), headerPrinted(false), out(o) {}
+
+/** Count message in the statistics */
+void ReporterBase::message(const messaging::Message& m) {
+    if (!overall.get()) overall = create();
+    overall->message(m);
+    if (wantBatch) {
+        if (!batch.get()) batch = create();
+        batch->message(m);
+        if (++batchCount == wantBatch) {
+            header();
+            batch->report(out);
+            out << std::endl;
+            batch = create();
+            batchCount = 0;
+        }
+    }
+}
+
+/** Print overall report. */
+void ReporterBase::report() {
+    header();
+    overall->report(out);
+    out << std::endl;
+}
+
+void ReporterBase::header() {
+    if (!headerPrinted) {
+        if (!overall.get()) overall = create();
+        overall->header(out);
+        out << std::endl;
+        headerPrinted = true;
+    }
+}
+
+
+}} // namespace qpid::tests

Propchange: qpid/trunk/qpid/cpp/src/tests/Statistics.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/Statistics.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/tests/Statistics.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Statistics.h?rev=931657&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Statistics.h (added)
+++ qpid/trunk/qpid/cpp/src/tests/Statistics.h Wed Apr  7 19:51:20 2010
@@ -0,0 +1,106 @@
+#ifndef TESTS_STATISTICS_H
+#define TESTS_STATISTICS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <qpid/sys/Time.h>
+#include <limits>
+#include <iosfwd>
+#include <memory>
+
+namespace qpid {
+
+namespace messaging {
+class Message;
+}
+
+namespace tests {
+
+class Statistic {
+  public:
+    virtual void message(const messaging::Message&) = 0;
+    virtual void report(std::ostream&) const = 0;
+    virtual void header(std::ostream&) const = 0;
+};
+
+class Throughput : public Statistic {
+  public:
+    Throughput();
+    virtual void message(const messaging::Message&);
+    virtual void report(std::ostream&) const;
+    virtual void header(std::ostream&) const;
+
+  protected:
+    int messages;
+
+  private:
+    bool started;
+    sys::AbsTime start;
+};
+
+class ThroughputAndLatency : public Throughput {
+  public:
+    ThroughputAndLatency();
+    virtual void message(const messaging::Message&);
+    virtual void report(std::ostream&) const;
+    virtual void header(std::ostream&) const;
+
+  private:
+    double total, min, max;     // Milliseconds
+};
+
+/** Report batch and overall statistics */
+class ReporterBase {
+  public:
+    /** Count message in the statistics */
+    void message(const messaging::Message& m);
+
+    /** Print overall report. */
+    void report();
+
+  protected:
+    ReporterBase(std::ostream& o, int batchSize);
+    virtual std::auto_ptr<Statistic> create() = 0;
+
+  private:
+    void header();
+    void report(const Statistic& s);
+    std::auto_ptr<Statistic> overall;
+    std::auto_ptr<Statistic> batch;
+    bool wantOverall;
+    int wantBatch, batchCount;
+    bool stopped, headerPrinted;
+    std::ostream& out;
+};
+
+template <class Stats> class Reporter : public ReporterBase {
+  public:
+    Reporter(std::ostream& o, int batchSize) : ReporterBase(o, batchSize) {}
+    virtual std::auto_ptr<Statistic> create() {
+        return std::auto_ptr<Statistic>(new Stats);
+    }
+};
+
+}} // namespace qpid::tests
+
+#endif  /*!TESTS_STATISTICS_H*/

Propchange: qpid/trunk/qpid/cpp/src/tests/Statistics.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/Statistics.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp?rev=931657&r1=931656&r2=931657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp Wed Apr  7 19:51:20 2010
@@ -29,6 +29,7 @@
 #include <qpid/log/Options.h>
 #include <qpid/client/amqp0_10/FailoverUpdates.h>
 #include "TestOptions.h"
+#include "Statistics.h"
 
 #include <iostream>
 #include <memory>
@@ -56,9 +57,12 @@ struct Options : public qpid::Options
     uint ackFrequency;
     uint tx;
     uint rollbackFrequency;
+    bool printContent;
     bool printHeaders;
     bool failoverUpdates;
     qpid::log::Options log;
+    bool report;
+    uint reportEvery;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
@@ -68,13 +72,16 @@ struct Options : public qpid::Options
           forever(false),
           messages(0),
           ignoreDuplicates(false),
-          capacity(0),
-          ackFrequency(1),
+          capacity(10000),
+          ackFrequency(100),
           tx(0),
           rollbackFrequency(0),
+          printContent(true),
           printHeaders(false),
           failoverUpdates(false),
-          log(argv0)
+          log(argv0),
+          report(false),
+          reportEvery(0)
     {
         addOptions()
             ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -84,12 +91,15 @@ struct Options : public qpid::Options
             ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
             ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0
means receive indefinitely")
             ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates
(by checking 'sn' header)")
-            ("capacity", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite
window)")
+            ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
             ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies
none of the messages will get accepted)")
             ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction
are not used)")
             ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency
(0 implies no transaction will be rolledback)")
-            ("print-headers", qpid::optValue(printHeaders), "If specified print out all message
headers as well as content")
+            ("print-content", qpid::optValue(printContent, "yes|no"), "print out message
content")
+            ("print-headers", qpid::optValue(printHeaders, "yes|no"), "print out message
headers")
             ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership
updates distributed via amq.failover")
+            ("report", qpid::optValue(report), "Report throughput statistics")
+            ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency
statistics every N messages.")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -162,7 +172,9 @@ int main(int argc, char ** argv)
             SequenceTracker sequenceTracker;
             Duration timeout = opts.getTimeout();
             bool done = false;
+            Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery);
             while (!done && receiver.fetch(msg, timeout)) {
+                reporter.message(msg);
                 if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
                     if (msg.getContent() == EOS) {
                         done = true;
@@ -179,7 +191,8 @@ int main(int argc, char ** argv)
                             std::cout << "Properties: " << msg.getProperties()
<< std::endl;
                             std::cout << std::endl;
                         }
-                        std::cout << msg.getContent() << std::endl;//TODO: handle
map or list messages
+                        if (opts.printContent)
+                            std::cout << msg.getContent() << std::endl;//TODO:
handle map or list messages
                         if (opts.messages && count >= opts.messages) done = true;
                     }
                 }
@@ -194,6 +207,7 @@ int main(int argc, char ** argv)
                 }
                 //opts.rejectFrequency??
             }
+            if (opts.report) reporter.report();
             if (opts.tx) {
                 if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency
== 0)) {
                     session.rollback();

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp?rev=931657&r1=931656&r2=931657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp Wed Apr  7 19:51:20 2010
@@ -25,7 +25,10 @@
 #include <qpid/messaging/Sender.h>
 #include <qpid/messaging/Session.h>
 #include <qpid/client/amqp0_10/FailoverUpdates.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/Monitor.h>
 #include "TestOptions.h"
+#include "Statistics.h"
 
 #include <fstream>
 #include <iostream>
@@ -34,7 +37,6 @@
 using namespace qpid::messaging;
 using namespace qpid::types;
 using qpid::client::amqp0_10::FailoverUpdates;
-
 typedef std::vector<std::string> string_vector;
 
 using namespace std;
@@ -64,20 +66,26 @@ struct Options : public qpid::Options
     uint capacity;
     bool failoverUpdates;
     qpid::log::Options log;
+    bool report;
+    uint reportEvery;
+    uint rate;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
           help(false),
           url("amqp:tcp:127.0.0.1"),
-          count(1),
+          count(0),
           sendEos(0),
           durable(false),
           ttl(0),
           tx(0),
           rollbackFrequency(0),
-          capacity(0),
+          capacity(1000),
           failoverUpdates(false),
-          log(argv0)
+          log(argv0),
+          report(false),
+          reportEvery(0),
+          rate(0)
     {
         addOptions()
             ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -87,17 +95,20 @@ struct Options : public qpid::Options
             ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating
one")
             ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
             ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of
input")
-            ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.")
+            ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
 	    ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
             ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
             ("map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content")
             ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
             ("user-id", qpid::optValue(userid, "USERID"), "userid for message")
-            ("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+            ("content", qpid::optValue(content, "CONTENT"), "use CONTENT as message content
instead of reading from stdin")
             ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message
queue")
             ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction
are not used)")
             ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency
(0 implies no transaction will be rolledback)")
             ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership
updates distributed via amq.failover")
+            ("report", qpid::optValue(report), "Report throughput statistics")
+            ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput statistics
every N messages")
+            ("rate", qpid::optValue(rate,"N"), "Send at rate of N messages/second. 0 means
send as fast as possible.")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -177,6 +188,29 @@ const string EOS("eos");
 
 using namespace qpid::tests;
 
+class ContentGenerator {
+  public:
+    virtual bool getContent(std::string& content) = 0;
+};
+
+class GetlineContentGenerator : public ContentGenerator {
+  public:
+    virtual bool getContent(std::string& content) { return getline(std::cin, content);
}
+};
+
+class FixedContentGenerator   : public ContentGenerator {
+  public:
+    FixedContentGenerator(std::string s) : content(s) {}
+    virtual bool getContent(std::string& contentOut) {
+        contentOut = content;
+        return true;
+    }
+  private:
+    std::string content;
+};
+
+
+
 int main(int argc, char ** argv)
 {
     Options opts;
@@ -200,18 +234,41 @@ int main(int argc, char ** argv)
             std::string content;
             uint sent = 0;
             uint txCount = 0;
-            while (getline(std::cin, content)) {
+            Reporter<Throughput> reporter(std::cout, opts.reportEvery);
+
+            std::auto_ptr<ContentGenerator> contentGen;
+            if (!opts.content.empty())
+                contentGen.reset(new FixedContentGenerator(opts.content));
+            else
+                contentGen.reset(new GetlineContentGenerator);
+
+            qpid::sys::AbsTime start = qpid::sys::now();
+            int64_t interval = 0;
+            if (opts.rate) interval = qpid::sys::TIME_SEC/opts.rate;
+
+            while (contentGen->getContent(content)) {
                 msg.setContent(content);
                 msg.getProperties()["sn"] = ++sent;
+                msg.getProperties()["ts"] = int64_t(
+                    qpid::sys::Duration(qpid::sys::AbsTime::epoch(), qpid::sys::now()));
                 sender.send(msg);
+                reporter.message(msg);
                 if (opts.tx && (sent % opts.tx == 0)) {
-                    if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency
== 0)) {
+                    if (opts.rollbackFrequency &&
+                        (++txCount % opts.rollbackFrequency == 0))
                         session.rollback();
-                    } else {
+                    else
                         session.commit();
-                    }
-                }                
+                }
+                if (opts.count && sent >= opts.count) break;
+                if (opts.rate) {
+                    qpid::sys::AbsTime waitTill(start, sent*interval);
+                    int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
+                    if (delay > 0)
+                        qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+                }
             }
+            if (opts.report) reporter.report();
             for (uint i = opts.sendEos; i > 0; --i) {
                 msg.getProperties()["sn"] = ++sent;
                 msg.setContent(EOS);//TODO: add in ability to send digest or similar

Added: qpid/trunk/qpid/doc/book/.gitignore
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/.gitignore?rev=931657&view=auto
==============================================================================
--- qpid/trunk/qpid/doc/book/.gitignore (added)
+++ qpid/trunk/qpid/doc/book/.gitignore Wed Apr  7 19:51:20 2010
@@ -0,0 +1,2 @@
+/build
+/out



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message