qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1177443 - in /qpid/branches/qpid-2920-active/qpid/cpp/src: qpid/cluster/exp/MessageHandler.cpp qpid/cluster/exp/QueueContext.cpp qpid/cluster/exp/Settings.cpp tests/cluster2_tests.py tests/qpid-cluster-benchmark tests/qpid-cpp-benchmark
Date Thu, 29 Sep 2011 21:53:53 GMT
Author: aconway
Date: Thu Sep 29 21:53:52 2011
New Revision: 1177443

URL: http://svn.apache.org/viewvc?rev=1177443&view=rev
Log:
QPID-2920: Minor improvements to cluster tests and logging.

Changed default --consume-lock to 10000.

Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/cluster2_tests.py
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cluster-benchmark
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cpp-benchmark

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1177443&r1=1177442&r2=1177443&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp Thu Sep
29 21:53:52 2011
@@ -61,6 +61,7 @@ void MessageHandler::enqueue(const std::
 
     boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
     // FIXME aconway 2010-10-28: decode message by frame in bounded-size buffers.
+    // FIXME aconway 2011-09-28: don't re-decode my own messages
     boost::intrusive_ptr<broker::Message> msg = new broker::Message();
     framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
     msg->decodeHeader(buf);

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp?rev=1177443&r1=1177442&r2=1177443&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp Thu Sep
29 21:53:52 2011
@@ -1,3 +1,4 @@
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -56,6 +57,8 @@ bool isOwner(QueueOwnership o) { return 
 void QueueContext::replicaState(
     QueueOwnership before, QueueOwnership after, bool selfDelivered)
 {
+    // No lock, this function does not touch any member variables.
+
     // Invariants for ownership:
     // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped
     // SOLE_OWNER <=> timer stopped, queue started
@@ -64,7 +67,9 @@ void QueueContext::replicaState(
     // Interested in state changes and my own events which lead to
     // ownership.
     if ((before != after || selfDelivered) && isOwner(after)) {
-        sys::Mutex::ScopedLock l(lock);
+        QPID_LOG(trace, "cluster start consumers on " << queue.getName() << ",
timer "
+                 << (after==SHARED_OWNER? "start" : "stop"));
+        sys::Mutex::ScopedLock l(lock); // FIXME aconway 2011-09-29: REMOVE
         queue.startConsumers();
         if (after == SHARED_OWNER) timer.start();
         else timer.stop();
@@ -90,6 +95,7 @@ void QueueContext::cancel(size_t n) {
     consumers = n;
     // When consuming threads are stopped, this->stopped will be called.
     if (n == 0) {
+        QPID_LOG(trace, "cluster stop consumers and timer on " << queue.getName());
         timer.stop();
         queue.stopConsumers();
     }
@@ -98,6 +104,7 @@ void QueueContext::cancel(size_t n) {
 // Called in timer thread.
 void QueueContext::timeout() {
     // When all threads have stopped, queue will call stopped()
+    QPID_LOG(trace, "cluster timeout, stopping consumers on " << queue.getName());
     queue.stopConsumers();
 }
 
@@ -105,6 +112,8 @@ void QueueContext::timeout() {
 // Called when no threads are dispatching from the queue.
 void QueueContext::stopped() {
     sys::Mutex::ScopedLock l(lock);
+    QPID_LOG(trace, "cluster timeout, stopped consumers on " << queue.getName()
+             << (consumers == 0 ? " unsubscribed" : " resubscribe"));
     if (consumers == 0)
         mcast.mcast(framing::ClusterQueueUnsubscribeBody(
                         framing::ProtocolVersion(), queue.getName()));

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp?rev=1177443&r1=1177442&r2=1177443&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp Thu Sep 29 21:53:52
2011
@@ -25,7 +25,7 @@ namespace qpid {
 namespace cluster {
 
 Settings::Settings() :    // Default settings
-    consumeLockMicros(100000)
+    consumeLockMicros(10000)
 {}
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/cluster2_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/cluster2_tests.py?rev=1177443&r1=1177442&r2=1177443&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/cluster2_tests.py (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/cluster2_tests.py Thu Sep 29 21:53:52
2011
@@ -150,6 +150,7 @@ class Cluster2Tests(BrokerTest):
                 self.session = session
                 self.receiver = session.receiver("q")
                 self.messages = []
+                self.error = None
                 Thread.__init__(self)
 
             def run(self):
@@ -158,6 +159,7 @@ class Cluster2Tests(BrokerTest):
                         self.messages.append(self.receiver.fetch(1))
                         self.session.acknowledge()
                 except Empty: pass
+                except Exception,e: self.error = e
 
         cluster = self.cluster(3, cluster2=True)
         connections = [ b.connect() for  b in cluster]
@@ -173,8 +175,11 @@ class Cluster2Tests(BrokerTest):
         while time.time() < t:
             sender.send(str(n))
             n += 1
-        for r in receivers: r.join();
-        for r in receivers: len(r.messages) > n/6 # Fairness test.
+        for r in receivers:
+            r.join();
+            if (r.error): self.fail("Receiver failed: %s" % r.error)
+        for r in receivers:
+            len(r.messages) > n/6 # Fairness test.
         messages = [int(m.content) for r in receivers for m in r.messages ]
         messages.sort()
         self.assertEqual(range(n), messages)

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1177443&r1=1177442&r2=1177443&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cluster-benchmark Thu Sep 29 21:53:52
2011
@@ -23,7 +23,7 @@
 # Default options
 MESSAGES="-m 10000"
 FLOW="--flow-control 100"	      # Flow control limit on queue depth for latency.
-REPEAT="--repeat 10"
+REPEAT="--repeat 5"
 QUEUES="-q 4"
 SENDERS="-s 3"
 RECEIVERS="-r 3"
@@ -43,9 +43,10 @@ while getopts "m:f:n:b:q:s:r:c:" opt; do
 	*) echo "Unknown option"; exit 1;;
     esac
 done
-
+BROKER=$(echo $BROKERS | sed s/,.*//)
 run_test() { echo $*; shift; "$@"; echo; echo; echo; }
 
-run_test "Throughput:" qpid-cpp-benchmark $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS
$MESSAGES $CLIENT_HOSTS
-run_test "Latency:" qpid-cpp-benchmark $REPEAT $BROKERS --connection-options "{tcp-nodelay:true}"
$MESSAGES $FLOW $CLIENT_HOSTS
+run_test "Multiple active brokers:" qpid-cpp-benchmark $REPEAT $BROKERS --summarize $QUEUES
$SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS
+run_test "Single active broker :" qpid-cpp-benchmark $REPEAT $BROKER --summarize $QUEUES
$SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS
+run_test "Latency under low load:" qpid-cpp-benchmark $REPEAT $BROKERS --connection-options
"{tcp-nodelay:true}" $MESSAGES $FLOW $CLIENT_HOSTS
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1177443&r1=1177442&r2=1177443&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cpp-benchmark Thu Sep 29 21:53:52
2011
@@ -177,9 +177,9 @@ def recreate_queues(queues, brokers):
     c.close()
 
 def print_header(timestamp):
-    if timestamp: latency_header="\tl-min\tl-max\tl-avg"
+    if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp"
     else: latency_header=""
-    print "send-tp\t\trecv-tp%s"%latency_header
+    print "send-tp\trecv-tp%s"%latency_header
 
 def parse(parser, lines):               # Parse sender/receiver output
     return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
@@ -190,25 +190,29 @@ def parse_senders(senders):
 def parse_receivers(receivers):
     return parse([int,float,float,float],[first_line(p) for p in receivers if p])
 
-def print_data(send_stats, recv_stats):
+def print_data(send_stats, recv_stats, total_tp):
     for send,recv in map(None, send_stats, recv_stats):
         line=""
         if send: line += "%d"%send[0]
         if recv:
-            line += "\t\t%d"%recv[0]
+            line += "\t%d"%recv[0]
             if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+        if total_tp is not None:
+            line += "\t%d"%total_tp
+            total_tp = None
         print line
 
-def print_summary(send_stats, recv_stats):
+def print_summary(send_stats, recv_stats, total_tp):
     def avg(s): sum(s) / len(s)
     send_tp = sum([l[0] for l in send_stats])
     recv_tp = sum([l[0] for l in recv_stats])
-    summary = "%d\t\t%d"%(send_tp, recv_tp)
+    summary = "%d\t%d"%(send_tp, recv_tp)
     if recv_stats and len(recv_stats[0]) == 4:
         l_min = sum(l[1] for l in recv_stats)/len(recv_stats)
         l_max = sum(l[2] for l in recv_stats)/len(recv_stats)
         l_avg = sum(l[3] for l in recv_stats)/len(recv_stats)
         summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg)
+    summary += "\t%d"%total_tp
     print summary
 
 
@@ -269,13 +273,17 @@ def main():
                                        brokers.next(), client_hosts.next())
                          for q in queues for j in xrange(opts.receivers)]
             ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
+            start = time.time()
             senders = [start_send(q, opts,brokers.next(), client_hosts.next())
                        for q in queues for j in xrange(opts.senders)]
             if opts.report_header and i == 0: print_header(opts.timestamp)
+            for p in senders + receivers: p.wait()
+            total_sent = opts.queues * opts.senders * opts.messages
+            total_tp = total_sent / (time.time()-start)
             send_stats=parse_senders(senders)
             recv_stats=parse_receivers(receivers)
-            if opts.summarize: print_summary(send_stats, recv_stats)
-            else: print_data(send_stats, recv_stats)
+            if opts.summarize: print_summary(send_stats, recv_stats, total_tp)
+            else: print_data(send_stats, recv_stats, total_tp)
     finally: clients.kill()             # No strays
 
 if __name__ == "__main__": main()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message