Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 55523 invoked from network); 3 Dec 2008 02:56:26 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 3 Dec 2008 02:56:26 -0000 Received: (qmail 68480 invoked by uid 500); 3 Dec 2008 02:56:38 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 68470 invoked by uid 500); 3 Dec 2008 02:56:38 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 68461 invoked by uid 99); 3 Dec 2008 02:56:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Dec 2008 18:56:38 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Dec 2008 02:55:08 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0E48D23888A2; Tue, 2 Dec 2008 18:55:56 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r722728 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/cluster/Cluster.cpp qpid/cluster/Event.cpp qpid/cluster/Event.h tests/Makefile.am tests/tsxtest.cpp Date: Wed, 03 Dec 2008 02:55:55 -0000 To: qpid-commits@incubator.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081203025556.0E48D23888A2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: aconway Date: Tue Dec 2 18:55:54 2008 New Revision: 722728 URL: http://svn.apache.org/viewvc?rev=722728&view=rev Log: cluster: add Event size to encoded header. Added: incubator/qpid/trunk/qpid/cpp/src/tests/tsxtest.cpp (with props) Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=722728&r1=722727&r2=722728&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Dec 2 18:55:54 2008 @@ -261,7 +261,8 @@ { Mutex::ScopedLock l(lock); MemberId from(nodeid, pid); - deliver(Event::delivered(from, msg, msg_len), l); + framing::Buffer buf(static_cast(msg), msg_len); + deliver(Event::decode(from, buf), l); } void Cluster::deliver(const Event& e, Lock&) { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=722728&r1=722727&r2=722728&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Tue Dec 2 18:55:54 2008 @@ -32,20 +32,21 @@ using framing::Buffer; -const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint32_t); +const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint32_t) + sizeof(uint32_t); Event::Event(EventType t, const ConnectionId& c, size_t s, uint32_t i) : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)), id(i) {} -Event Event::delivered(const MemberId& m, void* d, size_t s) { - Buffer buf(static_cast(d), s); +Event Event::decode(const MemberId& m, framing::Buffer& buf) { + assert(buf.available() > OVERHEAD); EventType type((EventType)buf.getOctet()); assert(type == DATA || type == CONTROL); ConnectionId connection(m, reinterpret_cast(buf.getLongLong())); uint32_t id = buf.getLong(); - assert(buf.getPosition() == OVERHEAD); - Event e(type, connection, s-OVERHEAD, id); - memcpy(e.getData(), static_cast(d)+OVERHEAD, s-OVERHEAD); + uint32_t size = buf.getLong(); + Event e(type, connection, size, id); + assert(buf.available() >= size); + memcpy(e.getData(), buf.getPointer() + buf.getPosition(), size); return e; } @@ -63,6 +64,7 @@ b.putOctet(type); b.putLongLong(reinterpret_cast(connectionId.getPointer())); b.putLong(id); + b.putLong(size); assert(b.getPosition() == OVERHEAD); iovec iov[] = { { header, OVERHEAD }, { const_cast(getData()), getSize() } }; return cpg.mcast(iov, sizeof(iov)/sizeof(*iov)); Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=722728&r1=722727&r2=722728&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Tue Dec 2 18:55:54 2008 @@ -46,7 +46,7 @@ Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0, uint32_t id=0); /** Create an event copied from delivered data. */ - static Event delivered(const MemberId& m, void* data, size_t size); + static Event decode(const MemberId& m, framing::Buffer&); /** Create an event containing a control */ static Event control(const framing::AMQBody&, const ConnectionId&, uint32_t id=0); Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=722728&r1=722727&r2=722728&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original) +++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Dec 2 18:55:54 2008 @@ -166,6 +166,10 @@ sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h sender_LDADD=$(lib_client) +check_PROGRAMS+=tsxtest +tsxtest_SOURCES=tsxtest.cpp TestOptions.h ConnectionOptions.h +tsxtest_LDADD=$(lib_client) + TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest Added: incubator/qpid/trunk/qpid/cpp/src/tests/tsxtest.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/tsxtest.cpp?rev=722728&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/tsxtest.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/tests/tsxtest.cpp Tue Dec 2 18:55:54 2008 @@ -0,0 +1,628 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace qpid; +using namespace qpid::client; +using namespace qpid::sys; + +struct Opts : public qpid::Options { + bool help; + bool sms, smr, tc, report, durable, clean; + string host; + int port; + string host2; + int port2; + int rate; + int start_from; + int id; + int symbols; + int sessions; + int messages; + int depth; + int sym_a; + int sym_b; + + // port 5672 + Opts() : help(false), sms(false), smr(false), tc(false), report(false), durable(false), clean(false), + host("127.0.0.1"), port(5672), host2("127.0.0.1"), port2(5672), + rate(10000), start_from(0), id(0), symbols(10), sessions(10), messages(10), depth(10000), sym_a(0), sym_b(9) + { + addOptions() + ("help", optValue(help), "print this help message") + ("sms", optValue(sms), "session sender") + ("smr", optValue(smr), "session receiver") + ("tc", optValue(tc), "trading component") + ("report", optValue(report), "report results when test is complete") + ("host,h", optValue(host, "HOST"), "Broker host to connect to") + ("port,p", optValue(port, "PORT"), "Broker port to connect to") + ("HOST,H", optValue(host2, "HOST2"), + "Broker host2 to connect to (applicable to TC only)") + ("PORT,P", optValue(port2, "PORT2"), + "Broker port2 to connect to (applicable to TC only)") + ("id", optValue(id, "N"), "Id number for this process") + ("start_from", optValue(start_from, "START"), "Client numbers start from N") + ("symbols", optValue(symbols, "N"), "Use N stock symbols") + ("sessions", optValue(sessions, "N"), "Use N sessions (sms)") + ("messages", optValue(messages, "N"), "Send N messages") + ("durable_msg", optValue(durable), "use durable messages") + ("clean", optValue(clean), "purge queues") + ("depth", optValue(depth, "DEPTH"), "queue depth") + ("rate", optValue(rate, "N"), "Send messages at rate of N per second, 0 means fast as possible.") + ("sym_a", optValue(sym_a, "a"), "Start symbols from TSX.a") + ("sym_b", optValue(sym_b, "b"), "Last symbol is TSX.b") + ; + } +}; + +ostream& operator<<(ostream&o, const Opts& opts) { + o << static_cast(opts) << endl; + return o; +} + +Opts opts; // Global, used by all functions. + +int +getRandomNumber(int low, int high) +{ + struct timeval tv; + gettimeofday(&tv, 0); + + srand48(tv.tv_usec); + double r = drand48(); + + int ret = int( double(high - low + 1) * r) + low; + + return ret; +} + +// Order entry message format. +struct OrderEntry { + int sessionId; + int msgId; + int responses; + int responseId; + int next_sess; + int add_size; + AbsTime time[4]; + + OrderEntry() : sessionId(), msgId(), responses(), responseId(), time() { + add_size = getRandomNumber(200, 500) - sizeof(OrderEntry); + } + void clone(OrderEntry* oe) { + sessionId = oe->sessionId; + msgId = oe->msgId; + responses = oe->responses; + responseId = oe->responseId; + next_sess = oe->next_sess; + add_size = oe->add_size; + time[0] = oe->time[0]; + time[1] = oe->time[1]; + time[2] = oe->time[2]; + time[3] = oe->time[3]; + } +}; +//} __attribute__ ((packed)); + +// Trivial "encoding" just copies struct memory. Assumes all machines +// are same architecture and compiler. +template Message makeMessage(const T& data, const string& key=string(), const string& /*exch*/=string()) { + Message message(string(reinterpret_cast(&data), sizeof(data)) + string(data.add_size, 'A'), key); +// Using default exchange +// message.getDeliveryProperties().setExchange(exch); + if (!opts.durable) { + message.getDeliveryProperties().setDeliveryMode(qpid::framing::TRANSIENT); + } else { + message.getDeliveryProperties().setDeliveryMode(qpid::framing::PERSISTENT); + } + return message; +} + +ostream& printMs(ostream& o, Duration d) { + return o << (d / TIME_MSEC) << "ms"; +} + +// Format a delay as a string. +string delayStr(AbsTime begin, AbsTime end) { + ostringstream os; + printMs(os, Duration(begin, end)); + return os.str(); +} + + string timeStr(AbsTime t) { + ostringstream os; + os << Duration(t); + return os.str(); + } + +ostream& operator << (ostream& o, const OrderEntry& oe) { + return o << "OrderEntry[" + << "session=" << oe.sessionId + << " message=" << oe.msgId + << " responses=" << oe.responses + << " responseId=" << oe.responseId + << " additional size=" << oe.add_size + << " delay[0-1]=" << delayStr(oe.time[0], oe.time[1]) + << " delay[1-2]=" << delayStr(oe.time[2], oe.time[3]) + << " TS[0]=" << timeStr(oe.time[0]) + << " TS[1]=" << timeStr(oe.time[1]) + << " TS[2]=" << timeStr(oe.time[2]) + << " TS[3]=" << timeStr(oe.time[3]) + //<< " body=" << oe.body + << "]"; +} + +const char* RESULTS="results"; + +// Base class for all the clients. +struct Client : public Runnable { + int id; + Connection connection; + AsyncSession amqpSession; + Thread thread; + + Client() : id(opts.id) { + ConnectionSettings cs; + cs.host = opts.host; + cs.port = opts.port; + cs.tcpNoDelay = true; + connection.open(cs); + amqpSession = async(connection.newSession()); + // Declare symbol queues + for (int i = 0; i < opts.symbols; ++i) { + if (opts.clean) + amqpSession.queueDelete(arg::queue=symbolQueue(i)); + QueueOptions options; + // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth); + amqpSession.queueDeclare(arg::queue=symbolQueue(i), arg::durable=true, arg::arguments=options); + } + // Declare session response queues + //for (int i = 0; i < opts.sessions; ++i) + // amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true); + amqpSession.queueDeclare(arg::queue=RESULTS); + } + + ~Client() { + amqpSession.close(); + connection.close(); + join(); + } + + void start() { + thread = Thread(*this); + } + + void join() { + thread.join(); + thread = Thread(); // Avoid double join. + } + + void sleep_until(const AbsTime& t) { + Duration d(now(), t); + if (int64_t(d) > 0) + qpid::sys::usleep(int64_t(d)/TIME_USEC); + } + + // Name of symbol queue n + string symbolQueue(int n) { + ostringstream sym; + sym << "TSX." << n%opts.symbols; + return sym.str(); + } + + // Name of session n + string sessionQueue(int n) { + ostringstream sym; + sym << "SMR." << n; + return sym.str(); + } + +}; + +struct Clean : public Client { + void run() { + if (opts.durable) + cout << "DURABLE messages" << endl; + else + cout << "TRANSIENT messages" << endl; + cout << "Cleaning queues" << endl; + // Purge any old data in the test queues. + // NOTE: the sync() call waits till the command completes. + // Otherwise we might start the tests before the purges were complete. + for (int i = 0; i < opts.symbols; ++i) { + amqpSession.queuePurge(arg::queue=symbolQueue(i)); + amqpSession.sync(); + } +// for (int i = 0; i < opts.sessions; ++i) { +// amqpSession.queuePurge(arg::queue=sessionQueue(i)); +// amqpSession.sync(); +// } + amqpSession.queuePurge(arg::queue=RESULTS); + amqpSession.sync(); + } +}; + +// Base class for subscriber clients +struct SubClient : public Client, public MessageListener { + SubscriptionManager subs; + SubClient() : subs(amqpSession) {} + ~SubClient() { stop(); } + void stop() { subs.stop(); join(); } +}; + + +struct Sms : public Client { + + Sms() { + // Declare session response queues + int ini = opts.start_from; + for (int i = ini; i < (ini+opts.sessions); ++i) { + if (opts.clean) + amqpSession.queueDelete(arg::queue=sessionQueue(i)); + QueueOptions options; + // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth); + amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options); + } + } + void run() { + int my_id = id+opts.start_from; + cout << "SMS " << my_id << endl; + AbsTime start = now(); + int64_t interval = opts.rate ? TIME_SEC/opts.rate : 0; + for (int i = 0; i < opts.messages; ++i) { + OrderEntry oe; + oe.sessionId = my_id; + oe.msgId = i; + if (opts.rate) + sleep_until(AbsTime(start, i*interval)); + int s = opts.sym_a + i%(opts.sym_b - opts.sym_a); + string sym = symbolQueue(s); + oe.responseId = 0; + oe.responses = ((i+1)%3 == 0) ? 2 : 1; // 2 responses for every 3rd message. + oe.next_sess = oe.sessionId+1; + oe.time[0] = now(); + // Send to queue sym via default exchange + amqpSession.messageTransfer(arg::content=makeMessage(oe, sym)); + if (i && i%1000==0) + cout << "SMS " << my_id << " sent " << i << " messages" << endl; + } + cout << "SMS " << my_id << " done" << endl; + } +}; + + +struct Tc : public SubClient { + + Connection connection2; + AsyncSession out_sess; + bool use_other_session; + int msgs; + Tc() : use_other_session(false), msgs(0) { + if (opts.host != opts.host2) { + ConnectionSettings cs2; + cs2.host = opts.host2; + cs2.port = opts.port2; + cs2.tcpNoDelay = true; + connection2.open(cs2); + out_sess = async(connection2.newSession()); + use_other_session = true; + // Declare session response queues + int ini = opts.start_from; + for (int i = ini; i < ini+opts.sessions; ++i) { + if (opts.clean) + out_sess.queueDelete(arg::queue=sessionQueue(i)); + QueueOptions options; + /// options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth); + out_sess.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options); + if(opts.clean) { + out_sess.queuePurge(arg::queue=sessionQueue(i)); + out_sess.sync(); + } + } + } else { + int ini = opts.start_from; + for (int i = ini; i < ini+opts.sessions; ++i) { + if (opts.clean) + amqpSession.queueDelete(arg::queue=sessionQueue(i)); + QueueOptions options; + // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth); + amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options); + if(opts.clean) { + amqpSession.queuePurge(arg::queue=sessionQueue(i)); + amqpSession.sync(); + } + } + //cout << "Warning: source and destination for TC are identical" << endl; + } + } + + ~Tc() { if(use_other_session) connection2.close(); } + + void received(Message& m) { + //assert(m.getData().size() == sizeof(OrderEntry)); + OrderEntry oe(*reinterpret_cast(m.getData().data())); + oe.time[1] = now(); + for (int i = 0; i < oe.responses; ++i) { + msgs++; + oe.responseId = i+1; + string key = sessionQueue(oe.sessionId); + if (i > 0) { + if (oe.sessionId+i == opts.sessions+opts.start_from) { + key = sessionQueue(opts.start_from); + } else { + key = sessionQueue(oe.sessionId+1); + } + } +//if(id == 0) +//cout << "[" << msgs << "] RESPOND TO " << i << "/" << oe.responses << " " << oe.sessionId << " " << key << endl; + + oe.time[2]=now(); + if(use_other_session) { + out_sess.messageTransfer(arg::content=makeMessage(oe, key)); + } else { + amqpSession.messageTransfer(arg::content=makeMessage(oe, key)); + } + if (msgs && msgs%1000==0) + cout << "TC " << id << " processed" << msgs << endl; + //cout << "Tc " << id << " sent to " << key << endl; + } + } + + void run() { + if ((id >= opts.sym_a) && (id <= opts.sym_b)) { + cout << "TC " << symbolQueue(id) << endl; + //QPID_LOG(info, "TC subscribing to " << symbolQueue(id)); + subs.subscribe(*this, symbolQueue(id)); + subs.run(); + } + } +}; + +// Average results per SMR or overall. +struct SMRStats { + int sessionId; + long messages; // Total received. + AbsTime begin, end; // Total time from first sent to last received. + int64_t delay[3]; // Total latency 0-1, 2-3, 0-3 + int64_t max_d; + int add_size; + + SMRStats() : sessionId(), messages(0), begin(), end(), delay(), add_size(0) + { + delay[0]=0; + delay[1]=0; + max_d=0; + } + + int64_t time() const { return Duration(begin, end); } + + long throughput() const { + int64_t timeMs=time()/TIME_MSEC; + return 1000*messages/timeMs; + } + + int64_t latency(int i) const { return delay[i]/messages; } + + void add(const OrderEntry& oe) { + if (messages == 0) begin = oe.time[0]; + end = oe.time[3]; + int64_t d1 = Duration(oe.time[0], oe.time[1]); + int64_t d2 = Duration(oe.time[2], oe.time[3]); + int64_t d3 = Duration(oe.time[0], oe.time[3]); + delay[0] += d1; + delay[1] += d2; + delay[2] += d3; + if (d3 > max_d) max_d = d3; + messages++; + } +}; + + +ostream& operator<<(ostream& o, const SMRStats& r) { + o << "SMRStats[" + << "session=" << r.sessionId + << " time="; + printMs(o, r.time()); + o << " messages=" << r.messages + << " latency(0)=" << r.latency(0) + << " latency(1)=" << r.latency(1) + << " throughput=" << r.throughput() + << "]"; + return o; +} + +struct Smr : public SubClient { + + SMRStats result; + int expect; // # messages to expect + list msgList; + int my_id; + + Smr() : expect(opts.messages + opts.messages/3) { + // Declare session response queues + int ini = opts.start_from; + for (int i = ini; i < ini+opts.sessions; ++i) { + if (opts.clean) + amqpSession.queueDelete(arg::queue=sessionQueue(i)); + QueueOptions options; + // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth); + amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options); + } + } + + void received(Message& m) { + //assert(m.getData().size() == sizeof(OrderEntry)); + OrderEntry oe(*reinterpret_cast(m.getData().data())); + oe.time[3] = now(); + //QPID_LOG(info, "SMR " << my_id << " received: " << oe); +//cout << " id: " << oe.msgId << " session id: " << oe.sessionId << endl; +//" received data: " << (m.getData().data() + sizeof(OrderEntry)) << endl; + result.add(oe); + + OrderEntry* new_oe = new OrderEntry(); + new_oe->clone(&oe); + msgList.push_back(new_oe); + if (result.messages && result.messages%1000==0) + cout << "SMR " << my_id << " received " << result.messages << endl; + if (result.messages == expect) { + subs.stop(); + amqpSession.messageTransfer( + arg::content=makeMessage(result, RESULTS)); + cout << result << endl; + + std::stringstream filename; + filename << "ReceivedMessages" << id+opts.start_from << ".dat"; + ofstream* datFileStream_ = new ofstream(filename.str().c_str(), ios::trunc); + for(list::const_iterator i = msgList.begin(); + i != msgList.end(); ++i) + { + (*datFileStream_) << **i << endl; + delete *i; + } + msgList.clear(); + delete datFileStream_; + } + } + + void run() { + my_id = id + opts.start_from; + cout << "SMR " << my_id << endl; + result.sessionId = my_id; + //QPID_LOG(info, "SMR subscribing to " << sessionQueue(my_id)); + subs.subscribe(*this, sessionQueue(my_id)); + subs.run(); + cout << "SMR " << my_id << " done" << endl; + } +}; + +struct Report : public SubClient { + long reports; + long messages; + int64_t time; + int64_t latency[2]; + int64_t max_l; + long throughput; + + int expect; // # reports expected + Report() : reports(0), messages(0), time(0), latency(), throughput(0), + expect(opts.sessions) { + latency[0] = 0; + latency[1] = 0; + latency[2] = 0; + } + + void received(Message& m) { + max_l = 0; + assert(m.getData().size() == sizeof(SMRStats)); + SMRStats r(*reinterpret_cast(m.getData().data())); + //QPID_LOG(info, "Report received: " << r); + reports++; + messages += r.messages; + time += r.time(); + latency[0] += r.latency(0); + latency[1] += r.latency(1); + latency[2] += r.latency(2); + if (r.max_d > max_l) max_l = r.max_d; + throughput += r.throughput(); + + if (--expect == 0) { + subs.stop(); + cout << endl << "Aggregate results: " << endl + << "Sessions: " << opts.sessions << " (from " << opts.start_from << ")" << endl + << "Messages per smr: " << messages/reports << endl + << "Avg time per smr: "; + printMs(cout, time/reports); + cout << endl + << "Avg throughput (msgs/sec) per smr: " << throughput/reports << endl + << "Avg latency per smr (0-1, 2-3): "; + printMs(cout, latency[0]/reports); + cout << ", "; + printMs(cout, latency[1]/reports); + cout << endl + << "Avg latency (0-3): "; + printMs(cout, (latency[2])/reports); + cout << endl + << "Max message latency: "; + printMs(cout, max_l); + cout << endl; + } + } + + void run() { + //QPID_LOG(info, "Reporter subscribing to " << RESULTS); + subs.subscribe(*this, RESULTS); + subs.run(); + } +}; + +template struct Clients { + boost::ptr_vector clients; + + Clients(bool enabled, int count) { + if (!enabled) return; + for (int i = 0; i < count; ++i) { + C* cc = new C(); + clients.push_back(cc); + clients.back().id = i; + } + } + void run() { + for_each(clients.begin(), clients.end(), mem_fun_ref(&Client::start)); + } + void join() { + for_each(clients.begin(), clients.end(), mem_fun_ref(&Client::join)); + } + void stop() { + for_each(clients.begin(), clients.end(), mem_fun_ref(&SubClient::stop)); + } +}; + +int main(int argc, char** argv) { + try { + opts.parse(argc, argv); + // If no action specified. + if (!opts.sms && !opts.tc && !opts.smr ) { + opts.sms = opts.tc = opts.smr = true; + } + + if (opts.help) + cout << opts; + else { + if (opts.clean) Clean().run(); + Clients sms(opts.sms, opts.sessions); + Clients tc(opts.tc, opts.symbols); + Clients smr(opts.smr, opts.sessions); + + smr.run(); + tc.run(); + sms.run(); + + sms.join(); + smr.join(); + if (opts.sms && opts.smr) + tc.stop(); // Leave TCs running if SMS or SMR are in a different process + else + tc.join(); + if (opts.report) Report().run(); + } + return 0; + } + catch (const exception& e) { + cerr << "Exception: " << e.what() << endl; + return 1; + } +} Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/tsxtest.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/tsxtest.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date