qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r593321 - in /incubator/qpid/trunk/qpid/cpp/src/tests: perfdist perftest.cpp
Date Thu, 08 Nov 2007 21:16:03 GMT
Author: aconway
Date: Thu Nov  8 13:16:02 2007
New Revision: 593321

URL: http://svn.apache.org/viewvc?rev=593321&view=rev
Log:

src/tests/perfdist: Use ssh to run distributed perftest.
  ./perfdist for usage.

src/tests/perftest.cpp:
 --summary gives one-line report, can cut/paste into spreadsheet for multiple runs.
 --purge as initial clean-up step for distributed tests (see perfdist)

Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/perfdist   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Added: incubator/qpid/trunk/qpid/cpp/src/tests/perfdist
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perfdist?rev=593321&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perfdist (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perfdist Thu Nov  8 13:16:02 2007
@@ -0,0 +1,33 @@
+#!/bin/bash
+#
+echo $_
+usage() {
+cat <<EOF
+Distributed perftest with ssh.
+Arguments before -- are passed to perftest.
+Arguments after -- are treated as host names.
+Publisher runs on first host. One listener runs on each 
+Remaining host. To run multiple liseners on a host, list it more than once.
+Do not pass --consumers, --publish or --listen options, they are computed
+from the host list.
+EOF
+exit 1
+}
+
+while test "$HOSTS" != "$*"; do
+    case $1 in
+	--consumers|--publish|--listen) usage ;;
+	--) shift; PUB="$1" ; shift ; HOSTS="$*" ;;
+	*) ARGS="$ARGS $1"; shift ;;
+    esac
+done
+test -n "$HOSTS" || { echo "No -- found"; usage; }
+N=`echo $HOSTS | wc -w`
+PERFTEST=`PATH=$PWD:$PATH which perftest`
+$PERFTEST --purge $ARGS
+ssh $PUB $PERFTEST $ARGS --publish --consumers $N&
+for h in $HOSTS; do
+    ssh $h $PERFTEST $ARGS --listen&
+done
+wait
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/perfdist
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=593321&r1=593320&r2=593321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Thu Nov  8 13:16:02 2007
@@ -39,26 +39,32 @@
 
     bool listen;
     bool publish;
+    bool purge;
     int count;
     int size;
     bool durable;
     int consumers;
     std::string mode;
     int autoAck;
+    bool summary;
     
     Opts() :
-        listen(false), publish(false), count(500000), size(64), consumers(1),
-        mode("shared"), autoAck(100)
+        listen(false), publish(false), purge(false),
+        count(500000), size(64), consumers(1),
+        mode("shared"), autoAck(100),
+        summary(false)
     {
         addOptions() 
             ("listen", optValue(listen), "Consume messages.")
             ("publish", optValue(publish), "Produce messages.")
+            ("purge", optValue(purge), "Purge shared queues.")
             ("count", optValue(count, "N"), "Messages to send.")
             ("size", optValue(size, "BYTES"), "Size of messages.")
             ("durable", optValue(durable, "N"), "Publish messages as durable.")
             ("consumers", optValue(consumers, "N"), "Number of consumers.")
             ("mode", optValue(mode, "shared|fanout|topic"), "consume mode")
-            ("auto-ack", optValue(autoAck, "N"), "ack every N messages.");
+            ("auto-ack", optValue(autoAck, "N"), "ack every N messages.")
+            ("summary,s", optValue(summary), "summary output only");
     }
 };
 
@@ -71,16 +77,19 @@
 
 // Create and purge the shared queues 
 void setup() {
-    cout << "Create shared queues" << endl;
     Connection connection;
     opts.open(connection);
     Session_0_10 session = connection.newSession();
     session.setSynchronous(true); // Make sure this is all completed.
     session.queueDeclare(arg::queue="control"); // Control queue
-    session.queuePurge(arg::queue="control");
+    if (opts.purge) {
+        if (!opts.summary) cout << "Purging shared queues" << endl;
+        session.queuePurge(arg::queue="control");
+    }
     if (mode==SHARED) {
         session.queueDeclare(arg::queue="perftest", arg::durable=opts.durable); // Shared
data queue
-        session.queuePurge(arg::queue="perftest");
+        if (opts.purge)		
+            session.queuePurge(arg::queue="perftest");
     }
     session.close();
     connection.close();
@@ -93,8 +102,8 @@
         else if (opts.mode=="fanout") mode = FANOUT;
         else if (opts.mode=="topic") mode = TOPIC;
         else throw Exception("Invalid mode");
-        if (!opts.listen && !opts.publish)
-            opts.listen = opts.publish = true;
+        if (!opts.listen && !opts.publish && !opts.purge)
+            opts.listen = opts.publish = opts.purge = true;
         setup();
         std::vector<ListenThread> listen(opts.consumers);
         PublishThread publish;
@@ -141,15 +150,15 @@
         Session_0_10 session = connection.newSession();
 
         // Wait for consumers.
-        cout << "Publisher wating for consumers " << flush;
+        if (!opts.summary) cout << "Waiting for consumers ready " << flush;
         SubscriptionManager subs(session);
         LocalQueue control;
         subs.subscribe(control, "control");
         for (int i = 0; i < opts.consumers; ++i) {
-            cout << "." << flush;
+            if (!opts.summary) cout << "." << flush;
             expect(control.pop().getData(), "ready");
         }
-        cout << endl;
+        if (!opts.summary) cout << endl;
 
         // Create test message
         size_t msgSize=max(opts.size, 32);
@@ -159,14 +168,14 @@
 	    msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
         // Time sending message.
         AbsTime start=now();
-        cout << "Publishing " << opts.count << " messages " << flush;
+        if (!opts.summary) cout << "Publishing " << opts.count << " messages
" << flush;
         for (int i=0; i<opts.count; i++) {
             sprintf(msgBuf, "%d", i);
             session.messageTransfer(arg::destination=exchange(),
                                     arg::content=msg);
-            if ((i%10000)==0) cout << "." << flush;
+            if (!opts.summary && (i%10000)==0) cout << "." << flush;
         }
-        cout << " done." << endl;
+        if (!opts.summary) cout << " done." << endl;
         msg.setData("done");    // Send done messages.
         if (mode==SHARED)
             for (int i = 0; i < opts.consumers; ++i)
@@ -176,18 +185,24 @@
         AbsTime end=now();
 
         // Report
-        cout << endl;
-        cout << "publish count:" << opts.count << endl;
-        cout << "publish secs:" << secs(start,end) << endl;
-        cout << "publish rate:" << (opts.count)/secs(start,end) << endl;
+        double publish_rate=(opts.count)/secs(start,end);
+        if (!opts.summary)
+            cout << endl
+                 << "publish count:" << opts.count << endl
+                 << "publish secs:" << secs(start,end) << endl
+                 << "publish rate:" << publish_rate << endl;
 
+        double consume_rate = 0; // Average rate for consumers.
         //  Wait for consumer(s) to finish.
-        cout << "Publisher wating for consumer reports. " << endl;
+        if (!opts.summary) cout << "Waiting for consumers done " << endl;
         for (int i = 0; i < opts.consumers; ++i) {
             string report=control.pop().getData();
-            if (report.find("consume") != 0)
-                throw Exception("Expected consumer report, got: "+report);
-            cout << endl << report;
+            if (!opts.summary)
+                cout << endl << report;
+            else {
+                double rate=boost::lexical_cast<double>(report);
+                consume_rate += rate/opts.consumers;
+            }
         }
         end=now();
 
@@ -197,11 +212,19 @@
             transfers=2*opts.count; 
         else                    // sent once, received N times.
             transfers=opts.count*(opts.consumers + 1);
-        
-        cout << endl
-             << "total transfers:" << transfers << endl
-             << "total secs:" << secs(start, end) << endl
-             << "total transfers/sec:" << transfers/secs(start, end) <<
endl;
+        double total_rate=transfers/secs(start, end);
+        if (opts.summary)
+            cout << opts.mode << '(' << opts.count
+                 << ':' << opts.consumers << ')'
+                 << '\t' << publish_rate
+                 << '\t' << consume_rate
+                 << '\t' << total_rate
+                 << endl;
+        else
+            cout << endl
+                 << "total transfers:" << transfers << endl
+                 << "total secs:" << secs(start, end) << endl
+                 << "total rate:" << total_rate << endl;
 		
         connection.close();
     }
@@ -238,23 +261,22 @@
         int consumed=0;
         AbsTime start=now();
         Message msg;
-        if (!opts.publish)
-            cout << "Consuming " << flush;
         while ((msg=consume.pop()).getData() != "done") {
             ++consumed;
-            if (!opts.publish && (consumed%10000) == 0)
-                cout << "." << flush;
         }
-        if (!opts.publish)
-            cout << endl;
         msg.acknowledge();      // Ack all outstanding messages.
         AbsTime end=now();
 
         // Report to publisher.
         ostringstream report;
-        report << "consume count: " << consumed << endl
-               << "consume secs: " << secs(start, end) << endl
-               << "consume rate: " << consumed/secs(start,end) << endl;
+        double consume_rate=consumed/secs(start,end);
+        if (opts.summary)
+            report << consume_rate;
+        else
+            report << "consume count: " << consumed << endl
+                   << "consume secs: " << secs(start, end) << endl
+                   << "consume rate: " << consume_rate << endl;
+        
         session.messageTransfer(arg::content=Message(report.str(), "control"));
         connection.close();
     }



Mime
View raw message