qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r595115 - /incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
Date Wed, 14 Nov 2007 23:05:50 GMT
Author: aconway
Date: Wed Nov 14 15:05:49 2007
New Revision: 595115

URL: http://svn.apache.org/viewvc?rev=595115&view=rev
Log:

perftest.cpp
 - Remove heap allocation per message in.
 - Verify sequence numbers in message data.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=595115&r1=595114&r2=595115&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Wed Nov 14 15:05:49 2007
@@ -41,12 +41,12 @@
     bool listen;
     bool publish;
     bool purge;
-    int count;
-    int size;
+    size_t count;
+    size_t size;
     bool durable;
-    int consumers;
+    size_t consumers;
     std::string mode;
-    int autoAck;
+    size_t autoAck;
     bool summary;
     
     Opts() :
@@ -109,12 +109,12 @@
         std::vector<ListenThread> listen(opts.consumers);
         PublishThread publish;
         if (opts.listen) 
-            for (int i = 0; i < opts.consumers; ++i)
+            for (size_t i = 0; i < opts.consumers; ++i)
                 listen[i].thread=Thread(listen[i]);
         if (opts.publish)
             publish.thread=Thread(publish);
         if (opts.listen)
-            for (int i = 0; i < opts.consumers; ++i)
+            for (size_t i = 0; i < opts.consumers; ++i)
                 listen[i].thread.join();
         if (opts.publish)
             publish.thread.join();
@@ -155,26 +155,25 @@
         SubscriptionManager subs(session);
         LocalQueue control;
         subs.subscribe(control, "control");
-        for (int i = 0; i < opts.consumers; ++i) {
+        for (size_t i = 0; i < opts.consumers; ++i) {
             if (!opts.summary) cout << "." << flush;
             expect(control.pop().getData(), "ready");
         }
         if (!opts.summary) cout << endl;
 
-        // Create test message
-        size_t msgSize=max(opts.size, 32);
-        char* msgBuf = new char[msgSize];
-		memset(msgBuf,'X', msgSize);
-
-        Message msg(string(), "perftest");
+        size_t msgSize=max(opts.size, sizeof(size_t));
+        Message msg(string(msgSize, 'X'), "perftest");
         if (opts.durable)
 	    msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
-        // Time sending message.
+
         AbsTime start=now();
-        if (!opts.summary) cout << "Publishing " << opts.count << " messages
" << flush;
-        for (int i=0; i<opts.count; i++) {
-            sprintf(msgBuf, "%d", i);
-            msg.setData(string(msgBuf,msgSize));
+        if (!opts.summary) cout << "Publishing " << opts.count
+                                << " messages " << flush;
+        for (size_t i=0; i<opts.count; i++) {
+            // Stamp the iteration into the message data, careful to avoid
+            // any heap allocation.
+            char* data = const_cast<char*>(msg.getData().data());
+            *reinterpret_cast<uint32_t*>(data) = i;
             session.messageTransfer(arg::destination=exchange(),
                                     arg::content=msg);
             if (!opts.summary && (i%10000)==0){
@@ -182,13 +181,12 @@
                  session.execution().sendSyncRequest();
             }
         }
-        delete [] msgBuf;
 		
         //Completion compl;
         if (!opts.summary) cout << " done." << endl;
         msg.setData("done");    // Send done messages.
         if (mode==SHARED)
-            for (int i = 0; i < opts.consumers; ++i)
+            for (size_t i = 0; i < opts.consumers; ++i)
                  session.messageTransfer(arg::destination=exchange(), arg::content=msg);
         else
             session.messageTransfer(arg::destination=exchange(), arg::content=msg);
@@ -203,20 +201,10 @@
                  << "publish secs:" << secs(start,end) << endl
                  << "publish rate:" << publish_rate << endl;
         
-        
-
-        // Report
-//        end=now(); //compl.wait();  (wait for publish confirm of write if durable)
-//        publish_rate=(opts.count)/secs(start,end);
-//        if (!opts.summary)
-//            cout << endl
-//                 << "synced secs:" << secs(start,end) << endl
-//                 << "synced rate:" << publish_rate << endl;
-
         double consume_rate = 0; // Average rate for consumers.
         //  Wait for consumer(s) to finish.
         if (!opts.summary) cout << "Waiting for consumers done " << endl;
-        for (int i = 0; i < opts.consumers; ++i) {
+        for (size_t i = 0; i < opts.consumers; ++i) {
             string report=control.pop().getData();
             if (!opts.summary)
                 cout << endl << report;
@@ -283,7 +271,16 @@
         int consumed=0;
         AbsTime start=now();
         Message msg;
+        size_t i = 0;
         while ((msg=consume.pop()).getData() != "done") {
+            char* data=const_cast<char*>(msg.getData().data());
+            size_t j=*reinterpret_cast<size_t*>(data);
+            if (i > j)
+                throw Exception(
+                    QPID_MSG("Messages out of order " << i
+                             << " before " << j));
+            else
+                i = j;
             ++consumed;
         }
         msg.acknowledge();      // Ack all outstanding messages -- ??



Mime
View raw message